This is an automated email from the ASF dual-hosted git repository. biyan pushed a commit to tag release-0.6.1-incubating-rc1 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit c251f7a05ae576b806abea2e07973ad2ca0a88af Author: tsreaper <tsreape...@gmail.com> AuthorDate: Fri Dec 29 11:59:53 2023 +0800 [flink] throws exception if there is a SinkMaterializer before Paimon sink (#2592) --- .../org/apache/paimon/flink/sink/FlinkSink.java | 33 ++++++++++++++++++++++ .../apache/paimon/flink/PartialUpdateITCase.java | 20 +++++++++++++ .../flink/PrimaryKeyFileStoreTableITCase.java | 6 +++- 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 7d3cecdbf..f72f66885 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -25,9 +25,11 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -39,10 +41,15 @@ import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; import java.util.UUID; import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT; @@ -139,6 +146,8 @@ public abstract class FlinkSink<T> implements Serializable { } public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) { + assertNoSinkMaterializer(input); + // do the actually writing action, no snapshot generated in this stage SingleOutputStreamOperator<Committable> written = doWrite(input, initialCommitUser, input.getParallelism()); @@ -147,6 +156,30 @@ public abstract class FlinkSink<T> implements Serializable { return doCommit(written, initialCommitUser); } + private void assertNoSinkMaterializer(DataStream<T> input) { + // traverse the transformation graph with breadth first search + Set<Integer> visited = new HashSet<>(); + Queue<Transformation<?>> queue = new LinkedList<>(); + queue.add(input.getTransformation()); + visited.add(input.getTransformation().getId()); + while (!queue.isEmpty()) { + Transformation<?> transformation = queue.poll(); + Preconditions.checkArgument( + !transformation.getName().startsWith("SinkMaterializer"), + String.format( + "Sink materializer must not be used with Paimon sink. " + + "Please set '%s' to '%s' in Flink's config.", + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE.key(), + ExecutionConfigOptions.UpsertMaterialize.NONE.name())); + for (Transformation<?> prev : transformation.getInputs()) { + if (!visited.contains(prev.getId())) { + queue.add(prev); + visited.add(prev.getId()); + } + } + } + } + public SingleOutputStreamOperator<Committable> doWrite( DataStream<T> input, String commitUser, Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index c8e336384..c8172131e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** ITCase for partial update. */ public class PartialUpdateITCase extends CatalogITCaseBase { @@ -327,4 +328,23 @@ public class PartialUpdateITCase extends CatalogITCaseBase { assertThat(sql("SELECT * FROM AGG")).containsExactlyInAnyOrder(Row.of(1, 0, 2)); } + + @Test + public void testNoSinkMaterializer() { + sEnv.getConfig() + .set( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, + ExecutionConfigOptions.UpsertMaterialize.FORCE); + try (CloseableIterator<Row> ignored = + streamSqlIter( + "INSERT INTO dwd_orders " + + "SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders " + + "UNION ALL " + + "SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;")) { + fail("Expecting exception"); + } catch (Exception e) { + assertThat(e) + .hasMessageContaining("Sink materializer must not be used with Paimon sink."); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index eba3fae70..8b4f452ef 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -88,6 +88,10 @@ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { sEnv.getConfig() .getConfiguration() .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(checkpointIntervalMs)); + sEnv.getConfig() + .set( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, + ExecutionConfigOptions.UpsertMaterialize.NONE); return sEnv; } @@ -239,7 +243,7 @@ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { // write update data sEnv.executeSql( "INSERT INTO `default_catalog`.`default_database`.`S` " - + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1, 'D')") + + "VALUES (1, 'D'), (1, 'C'), (1, 'B'), (1, 'A')") .await(); // read update data