This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d61ed7e17 [flink] Postpone sink materializer check from compile time
to runtime (#2951)
d61ed7e17 is described below
commit d61ed7e17808e6c3100d20fdf1c5fb694c679d9d
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 6 17:16:38 2024 +0800
[flink] Postpone sink materializer check from compile time to runtime
(#2951)
---
.../apache/paimon/utils/SerializableRunnable.java | 25 +++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 85 +++++++++++++---------
.../apache/paimon/flink/PartialUpdateITCase.java | 19 +++--
3 files changed, 86 insertions(+), 43 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java
new file mode 100644
index 000000000..ae1b5453d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Serializable;
+
+/** A serializable {@link Runnable}. */
+@FunctionalInterface
+public interface SerializableRunnable extends Runnable, Serializable {}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 766edc499..5f0c4d662 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableRunnable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
@@ -82,7 +83,18 @@ public abstract class FlinkSink<T> implements Serializable {
}
private StoreSinkWrite.Provider createWriteProvider(
- CheckpointConfig checkpointConfig, boolean isStreaming) {
+ CheckpointConfig checkpointConfig, boolean isStreaming, boolean
hasSinkMaterializer) {
+ SerializableRunnable assertNoSinkMaterializer =
+ () ->
+ Preconditions.checkArgument(
+ !hasSinkMaterializer,
+ String.format(
+ "Sink materializer must not be used
with Paimon sink. "
+ + "Please set '%s' to '%s' in
Flink's config.",
+
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
+ .key(),
+
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+
boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
waitCompaction = false;
@@ -107,32 +119,36 @@ public abstract class FlinkSink<T> implements
Serializable {
if (changelogProducer == ChangelogProducer.FULL_COMPACTION ||
deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new GlobalFullCompactionSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- finalDeltaCommits,
- isStreaming,
- memoryPool,
- metricGroup);
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new GlobalFullCompactionSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ finalDeltaCommits,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
}
}
- return (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
- new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
+ assertNoSinkMaterializer.run();
+ return new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
}
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
@@ -146,16 +162,13 @@ public abstract class FlinkSink<T> implements
Serializable {
}
public DataStreamSink<?> sinkFrom(DataStream<T> input, String
initialCommitUser) {
- assertNoSinkMaterializer(input);
-
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser,
input.getParallelism());
-
// commit the committable to generate a new snapshot
return doCommit(written, initialCommitUser);
}
- private void assertNoSinkMaterializer(DataStream<T> input) {
+ private boolean hasSinkMaterializer(DataStream<T> input) {
// traverse the transformation graph with breadth first search
Set<Integer> visited = new HashSet<>();
Queue<Transformation<?>> queue = new LinkedList<>();
@@ -163,13 +176,9 @@ public abstract class FlinkSink<T> implements Serializable
{
visited.add(input.getTransformation().getId());
while (!queue.isEmpty()) {
Transformation<?> transformation = queue.poll();
- Preconditions.checkArgument(
- !transformation.getName().startsWith("SinkMaterializer"),
- String.format(
- "Sink materializer must not be used with Paimon
sink. "
- + "Please set '%s' to '%s' in Flink's
config.",
-
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE.key(),
-
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+ if (transformation.getName().startsWith("SinkMaterializer")) {
+ return true;
+ }
for (Transformation<?> prev : transformation.getInputs()) {
if (!visited.contains(prev.getId())) {
queue.add(prev);
@@ -177,6 +186,7 @@ public abstract class FlinkSink<T> implements Serializable {
}
}
}
+ return false;
}
public DataStream<Committable> doWrite(
@@ -195,7 +205,10 @@ public abstract class FlinkSink<T> implements Serializable
{
+ table.name(),
new CommittableTypeInfo(),
createWriteOperator(
-
createWriteProvider(env.getCheckpointConfig(), isStreaming),
+ createWriteProvider(
+ env.getCheckpointConfig(),
+ isStreaming,
+ hasSinkMaterializer(input)),
commitUser))
.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 9fe276a9d..e59c6dc8f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -336,16 +337,20 @@ public class PartialUpdateITCase extends
CatalogITCaseBase {
.set(
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
ExecutionConfigOptions.UpsertMaterialize.FORCE);
- try (CloseableIterator<Row> ignored =
- streamSqlIter(
- "INSERT INTO dwd_orders "
- + "SELECT OrderID, OrderNumber, PersonID,
CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
- + "UNION ALL "
- + "SELECT OrderID, CAST(NULL AS INT),
dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders
ON dim_persons.PersonID = ods_orders.PersonID;")) {
+ sEnv.getConfig().set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+ String sql =
+ "INSERT INTO dwd_orders "
+ + "SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS
STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
+ + "UNION ALL "
+ + "SELECT OrderID, CAST(NULL AS INT),
dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders
ON dim_persons.PersonID = ods_orders.PersonID;";
+ try {
+ sEnv.executeSql(sql).await();
fail("Expecting exception");
} catch (Exception e) {
assertThat(e)
- .hasMessageContaining("Sink materializer must not be used
with Paimon sink.");
+ .hasRootCauseMessage(
+ "Sink materializer must not be used with Paimon
sink. "
+ + "Please set
'table.exec.sink.upsert-materialize' to 'NONE' in Flink's config.");
}
}