This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to tag release-0.6.1-incubating-rc1
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit c251f7a05ae576b806abea2e07973ad2ca0a88af
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Fri Dec 29 11:59:53 2023 +0800

    [flink] throws exception if there is a SinkMaterializer before Paimon sink 
(#2592)
---
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 33 ++++++++++++++++++++++
 .../apache/paimon/flink/PartialUpdateITCase.java   | 20 +++++++++++++
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  6 +++-
 3 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 7d3cecdbf..f72f66885 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -25,9 +25,11 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
@@ -39,10 +41,15 @@ import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 
 import static 
org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
@@ -139,6 +146,8 @@ public abstract class FlinkSink<T> implements Serializable {
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input, String 
initialCommitUser) {
+        assertNoSinkMaterializer(input);
+
         // do the actually writing action, no snapshot generated in this stage
         SingleOutputStreamOperator<Committable> written =
                 doWrite(input, initialCommitUser, input.getParallelism());
@@ -147,6 +156,30 @@ public abstract class FlinkSink<T> implements Serializable 
{
         return doCommit(written, initialCommitUser);
     }
 
+    private void assertNoSinkMaterializer(DataStream<T> input) {
+        // traverse the transformation graph with breadth first search
+        Set<Integer> visited = new HashSet<>();
+        Queue<Transformation<?>> queue = new LinkedList<>();
+        queue.add(input.getTransformation());
+        visited.add(input.getTransformation().getId());
+        while (!queue.isEmpty()) {
+            Transformation<?> transformation = queue.poll();
+            Preconditions.checkArgument(
+                    !transformation.getName().startsWith("SinkMaterializer"),
+                    String.format(
+                            "Sink materializer must not be used with Paimon 
sink. "
+                                    + "Please set '%s' to '%s' in Flink's 
config.",
+                            
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE.key(),
+                            
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+            for (Transformation<?> prev : transformation.getInputs()) {
+                if (!visited.contains(prev.getId())) {
+                    queue.add(prev);
+                    visited.add(prev.getId());
+                }
+            }
+        }
+    }
+
     public SingleOutputStreamOperator<Committable> doWrite(
             DataStream<T> input, String commitUser, Integer parallelism) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index c8e336384..c8172131e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** ITCase for partial update. */
 public class PartialUpdateITCase extends CatalogITCaseBase {
@@ -327,4 +328,23 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
 
         assertThat(sql("SELECT * FROM 
AGG")).containsExactlyInAnyOrder(Row.of(1, 0, 2));
     }
+
+    @Test
+    public void testNoSinkMaterializer() {
+        sEnv.getConfig()
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+                        ExecutionConfigOptions.UpsertMaterialize.FORCE);
+        try (CloseableIterator<Row> ignored =
+                streamSqlIter(
+                        "INSERT INTO dwd_orders "
+                                + "SELECT OrderID, OrderNumber, PersonID, 
CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders "
+                                + "UNION ALL "
+                                + "SELECT OrderID, CAST(NULL AS INT), 
dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders 
ON dim_persons.PersonID = ods_orders.PersonID;")) {
+            fail("Expecting exception");
+        } catch (Exception e) {
+            assertThat(e)
+                    .hasMessageContaining("Sink materializer must not be used 
with Paimon sink.");
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index eba3fae70..8b4f452ef 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -88,6 +88,10 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         sEnv.getConfig()
                 .getConfiguration()
                 .set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(checkpointIntervalMs));
+        sEnv.getConfig()
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+                        ExecutionConfigOptions.UpsertMaterialize.NONE);
         return sEnv;
     }
 
@@ -239,7 +243,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         // write update data
         sEnv.executeSql(
                         "INSERT INTO `default_catalog`.`default_database`.`S` "
-                                + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1, 
'D')")
+                                + "VALUES (1, 'D'), (1, 'C'), (1, 'B'), (1, 
'A')")
                 .await();
 
         // read update data

Reply via email to