This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d61ed7e17 [flink] Postpone sink materializer check from compile time 
to runtime (#2951)
d61ed7e17 is described below

commit d61ed7e17808e6c3100d20fdf1c5fb694c679d9d
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 6 17:16:38 2024 +0800

    [flink] Postpone sink materializer check from compile time to runtime 
(#2951)
---
 .../apache/paimon/utils/SerializableRunnable.java  | 25 +++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 85 +++++++++++++---------
 .../apache/paimon/flink/PartialUpdateITCase.java   | 19 +++--
 3 files changed, 86 insertions(+), 43 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java
new file mode 100644
index 000000000..ae1b5453d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/SerializableRunnable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Serializable;
+
+/** A serializable {@link Runnable}. */
+@FunctionalInterface
+public interface SerializableRunnable extends Runnable, Serializable {}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 766edc499..5f0c4d662 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableRunnable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
@@ -82,7 +83,18 @@ public abstract class FlinkSink<T> implements Serializable {
     }
 
     private StoreSinkWrite.Provider createWriteProvider(
-            CheckpointConfig checkpointConfig, boolean isStreaming) {
+            CheckpointConfig checkpointConfig, boolean isStreaming, boolean 
hasSinkMaterializer) {
+        SerializableRunnable assertNoSinkMaterializer =
+                () ->
+                        Preconditions.checkArgument(
+                                !hasSinkMaterializer,
+                                String.format(
+                                        "Sink materializer must not be used 
with Paimon sink. "
+                                                + "Please set '%s' to '%s' in 
Flink's config.",
+                                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE
+                                                .key(),
+                                        
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+
         boolean waitCompaction;
         if (table.coreOptions().writeOnly()) {
             waitCompaction = false;
@@ -107,32 +119,36 @@ public abstract class FlinkSink<T> implements 
Serializable {
 
             if (changelogProducer == ChangelogProducer.FULL_COMPACTION || 
deltaCommits >= 0) {
                 int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
-                        new GlobalFullCompactionSinkWrite(
-                                table,
-                                commitUser,
-                                state,
-                                ioManager,
-                                ignorePreviousFiles,
-                                waitCompaction,
-                                finalDeltaCommits,
-                                isStreaming,
-                                memoryPool,
-                                metricGroup);
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                    assertNoSinkMaterializer.run();
+                    return new GlobalFullCompactionSinkWrite(
+                            table,
+                            commitUser,
+                            state,
+                            ioManager,
+                            ignorePreviousFiles,
+                            waitCompaction,
+                            finalDeltaCommits,
+                            isStreaming,
+                            memoryPool,
+                            metricGroup);
+                };
             }
         }
 
-        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
-                new StoreSinkWriteImpl(
-                        table,
-                        commitUser,
-                        state,
-                        ioManager,
-                        ignorePreviousFiles,
-                        waitCompaction,
-                        isStreaming,
-                        memoryPool,
-                        metricGroup);
+        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
+            assertNoSinkMaterializer.run();
+            return new StoreSinkWriteImpl(
+                    table,
+                    commitUser,
+                    state,
+                    ioManager,
+                    ignorePreviousFiles,
+                    waitCompaction,
+                    isStreaming,
+                    memoryPool,
+                    metricGroup);
+        };
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input) {
@@ -146,16 +162,13 @@ public abstract class FlinkSink<T> implements 
Serializable {
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input, String 
initialCommitUser) {
-        assertNoSinkMaterializer(input);
-
         // do the actually writing action, no snapshot generated in this stage
         DataStream<Committable> written = doWrite(input, initialCommitUser, 
input.getParallelism());
-
         // commit the committable to generate a new snapshot
         return doCommit(written, initialCommitUser);
     }
 
-    private void assertNoSinkMaterializer(DataStream<T> input) {
+    private boolean hasSinkMaterializer(DataStream<T> input) {
         // traverse the transformation graph with breadth first search
         Set<Integer> visited = new HashSet<>();
         Queue<Transformation<?>> queue = new LinkedList<>();
@@ -163,13 +176,9 @@ public abstract class FlinkSink<T> implements Serializable 
{
         visited.add(input.getTransformation().getId());
         while (!queue.isEmpty()) {
             Transformation<?> transformation = queue.poll();
-            Preconditions.checkArgument(
-                    !transformation.getName().startsWith("SinkMaterializer"),
-                    String.format(
-                            "Sink materializer must not be used with Paimon 
sink. "
-                                    + "Please set '%s' to '%s' in Flink's 
config.",
-                            
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE.key(),
-                            
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+            if (transformation.getName().startsWith("SinkMaterializer")) {
+                return true;
+            }
             for (Transformation<?> prev : transformation.getInputs()) {
                 if (!visited.contains(prev.getId())) {
                     queue.add(prev);
@@ -177,6 +186,7 @@ public abstract class FlinkSink<T> implements Serializable {
                 }
             }
         }
+        return false;
     }
 
     public DataStream<Committable> doWrite(
@@ -195,7 +205,10 @@ public abstract class FlinkSink<T> implements Serializable 
{
                                         + table.name(),
                                 new CommittableTypeInfo(),
                                 createWriteOperator(
-                                        
createWriteProvider(env.getCheckpointConfig(), isStreaming),
+                                        createWriteProvider(
+                                                env.getCheckpointConfig(),
+                                                isStreaming,
+                                                hasSinkMaterializer(input)),
                                         commitUser))
                         .setParallelism(parallelism == null ? 
input.getParallelism() : parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 9fe276a9d..e59c6dc8f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
@@ -336,16 +337,20 @@ public class PartialUpdateITCase extends 
CatalogITCaseBase {
                 .set(
                         
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
                         ExecutionConfigOptions.UpsertMaterialize.FORCE);
-        try (CloseableIterator<Row> ignored =
-                streamSqlIter(
-                        "INSERT INTO dwd_orders "
-                                + "SELECT OrderID, OrderNumber, PersonID, 
CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
-                                + "UNION ALL "
-                                + "SELECT OrderID, CAST(NULL AS INT), 
dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders 
ON dim_persons.PersonID = ods_orders.PersonID;")) {
+        sEnv.getConfig().set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+        String sql =
+                "INSERT INTO dwd_orders "
+                        + "SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS 
STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
+                        + "UNION ALL "
+                        + "SELECT OrderID, CAST(NULL AS INT), 
dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders 
ON dim_persons.PersonID = ods_orders.PersonID;";
+        try {
+            sEnv.executeSql(sql).await();
             fail("Expecting exception");
         } catch (Exception e) {
             assertThat(e)
-                    .hasMessageContaining("Sink materializer must not be used 
with Paimon sink.");
+                    .hasRootCauseMessage(
+                            "Sink materializer must not be used with Paimon 
sink. "
+                                    + "Please set 
'table.exec.sink.upsert-materialize' to 'NONE' in Flink's config.");
         }
     }
 

Reply via email to