This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bff1fc2782e04598275ab05177b89df38f891c94 Author: lincoln.lil <[email protected]> AuthorDate: Thu Sep 8 21:50:10 2022 +0800 [FLINK-28569][table-planner] Fix SinkUpsertMaterializer that should be aware of the input upsertKey if it is not empty to prevent wrong results This closes #20791 --- .../plan/nodes/exec/batch/BatchExecSink.java | 3 +- .../plan/nodes/exec/common/CommonExecSink.java | 25 +++- .../plan/nodes/exec/stream/StreamExecSink.java | 12 +- .../table/planner/plan/utils/UpsertKeyUtil.java | 66 +++++++++ .../nodes/physical/stream/StreamPhysicalSink.scala | 10 +- .../nodes/exec/stream/TableSinkJsonPlanTest.java | 36 +++++ .../planner/plan/utils/UpsertKeyUtilTest.java | 52 ++++++++ .../utils/JavaUserDefinedScalarFunctions.java | 4 + .../testChangelogSource.out | 1 + .../testUpsertSource.out | 1 + .../testDeduplication.out | 1 + .../ExpandJsonPlanTest_jsonplan/testExpand.out | 1 + ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 1 + ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 1 + ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 1 + ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 1 + ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 1 + ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 1 + .../testEventTimeTumbleWindow.out | 1 + .../testProcTimeTumbleWindow.out | 1 + .../testIncrementalAggregate.out | 1 + ...lAggregateWithSumCountDistinctAndRetraction.out | 1 + .../testInnerJoinWithEqualPk.out | 1 + ...WithNonDeterministicFuncSinkWithDifferentPk.out | 147 +++++++++++++++++++++ .../runtime/stream/sql/TableSinkITCase.scala | 62 +++++++++ .../operators/sink/SinkUpsertMaterializer.java | 125 +++++++++++++----- .../operators/sink/SinkUpsertMaterializerTest.java | 102 +++++++++++--- 27 files changed, 601 insertions(+), 58 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index a6202871816..c9068656a51 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -71,6 +71,7 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec inputTransform, tableSink, -1, - false); + false, + null); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 71f8184df2c..7f978565060 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -61,6 +61,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.typeutils.RowTypeUtils; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -141,7 +142,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> Transformation<RowData> inputTransform, DynamicTableSink tableSink, int rowtimeFieldIndex, - boolean upsertMaterialize) { + boolean upsertMaterialize, + int[] inputUpsertKey) { final ResolvedSchema schema = tableSinkSpec.getContextResolvedTable().getResolvedSchema(); final SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)); @@ -193,7 +195,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkParallelism, config, classLoader, - physicalRowType); + physicalRowType, + inputUpsertKey); } return (Transformation<Object>) @@ -402,16 +405,28 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> int sinkParallelism, ExecNodeConfig config, ClassLoader classLoader, - RowType physicalRowType) { - GeneratedRecordEqualiser equaliser = + RowType physicalRowType, + int[] inputUpsertKey) { + final GeneratedRecordEqualiser rowEqualiser = new EqualiserCodeGenerator(physicalRowType, classLoader) .generateRecordEqualiser("SinkMaterializeEqualiser"); + final GeneratedRecordEqualiser upsertKeyEqualiser = + inputUpsertKey == null + ? null + : new EqualiserCodeGenerator( + RowTypeUtils.projectRowType( + physicalRowType, inputUpsertKey), + classLoader) + .generateRecordEqualiser("SinkMaterializeUpsertKeyEqualiser"); + SinkUpsertMaterializer operator = new SinkUpsertMaterializer( StateConfigUtil.createTtlConfig( config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()), InternalSerializers.create(physicalRowType), - equaliser); + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]); final List<String> pkFieldNames = Arrays.stream(primaryKeys) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index 1bd7ad0b22b..6932c1a951b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -74,6 +74,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode"; public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize"; + public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey"; @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) private final ChangelogMode inputChangelogMode; @@ -82,6 +83,10 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj @JsonInclude(JsonInclude.Include.NON_DEFAULT) private final boolean upsertMaterialize; + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final int[] inputUpsertKey; + public StreamExecSink( ReadableConfig tableConfig, DynamicTableSinkSpec tableSinkSpec, @@ -89,6 +94,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj InputProperty inputProperty, LogicalType outputType, boolean upsertMaterialize, + int[] inputUpsertKey, String description) { this( ExecNodeContext.newNodeId(), @@ -99,6 +105,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj Collections.singletonList(inputProperty), outputType, upsertMaterialize, + inputUpsertKey, description); } @@ -112,6 +119,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType, @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean upsertMaterialize, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) int[] inputUpsertKey, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { super( id, @@ -125,6 +133,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj description); this.inputChangelogMode = inputChangelogMode; this.upsertMaterialize = upsertMaterialize; + this.inputUpsertKey = inputUpsertKey; } @SuppressWarnings("unchecked") @@ -171,6 +180,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj inputTransform, tableSink, rowtimeFieldIndex, - upsertMaterialize); + upsertMaterialize, + inputUpsertKey); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java new file mode 100644 index 00000000000..3e054f6793f --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.calcite.util.ImmutableBitSet; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Set; + +/** + * Utility for upsertKey which represented as a Set of {@link + * org.apache.calcite.util.ImmutableBitSet}. + */ +public class UpsertKeyUtil { + + /** + * Returns the smallest key of given upsert keys. The rule of 'small' is an upsert key + * represented by {@link ImmutableBitSet} has smaller cardinality or has a smaller leading + * element when the same cardinality. E.g., '{0,1}' is smaller than '{0,1,2}' and '{0,1}' is + * smaller than '{0,2}'. + * + * @param upsertKeys input upsert keys + * @return the smallest key + */ + @Nonnull + public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> upsertKeys) { + if (null == upsertKeys || upsertKeys.isEmpty()) { + return new int[0]; + } + return upsertKeys.stream() + .map(ImmutableBitSet::toArray) + .reduce( + (k1, k2) -> { + if (k1.length < k2.length) { + return k1; + } + if (k1.length == k2.length) { + for (int index = 0; index < k1.length; index++) { + if (k1[index] < k2[index]) { + return k1; + } + } + } + return k2; + }) + .get(); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala index c64c41721ca..bffb097d6d7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala @@ -21,11 +21,12 @@ import org.apache.flink.table.catalog.ContextResolvedTable import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.calcite.Sink import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink -import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils +import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, UpsertKeyUtil} import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} @@ -81,6 +82,12 @@ class StreamPhysicalSink( val tableSinkSpec = new DynamicTableSinkSpec(contextResolvedTable, util.Arrays.asList(abilitySpecs: _*)) tableSinkSpec.setTableSink(tableSink) + // no need to call getUpsertKeysInKeyGroupRange here because there's no exchange before sink, + // and only add exchange in exec sink node. + val inputUpsertKeys = FlinkRelMetadataQuery + .reuseOrCreate(cluster.getMetadataQuery) + .getUpsertKeys(inputRel) + new StreamExecSink( unwrapTableConfig(this), tableSinkSpec, @@ -88,6 +95,7 @@ class StreamPhysicalSink( InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), upsertMaterialize, + UpsertKeyUtil.getSmallestKey(inputUpsertKeys), getRelDetailedDescription) } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java index e161bddd8f6..a467493499d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -91,4 +92,39 @@ public class TableSinkJsonPlanTest extends TableTestBase { tEnv.executeSql(sinkTableDdl); util.verifyJsonPlan("insert into MySink select * from MyTable"); } + + @Test + public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() { + tEnv.createTemporaryFunction( + "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf()); + + String cdcDdl = + "CREATE TABLE users (\n" + + " user_id STRING,\n" + + " user_name STRING,\n" + + " email STRING,\n" + + " balance DECIMAL(18,2),\n" + + " primary key (user_id) not enforced\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'changelog-mode' = 'I,UA,UB,D'\n" + + ")"; + + String sinkTableDdl = + "CREATE TABLE sink (\n" + + " user_id STRING,\n" + + " user_name STRING,\n" + + " email STRING,\n" + + " balance DECIMAL(18,2),\n" + + " PRIMARY KEY(email) NOT ENFORCED\n" + + ") WITH(\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false'\n" + + ")"; + tEnv.executeSql(cdcDdl); + tEnv.executeSql(sinkTableDdl); + + util.verifyJsonPlan( + "insert into sink select user_id, ndFunc(user_name), email, balance from users"); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java new file mode 100644 index 00000000000..4ddfadfc1f9 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.calcite.util.ImmutableBitSet; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link UpsertKeyUtil}. */ +public class UpsertKeyUtilTest { + private final int[] emptyKey = new int[0]; + + @Test + public void testSmallestKey() { + assertThat(UpsertKeyUtil.getSmallestKey(null)).isEqualTo(emptyKey); + assertThat(UpsertKeyUtil.getSmallestKey(new HashSet<>())).isEqualTo(emptyKey); + + ImmutableBitSet smallestKey = ImmutableBitSet.of(0, 1); + ImmutableBitSet middleKey = ImmutableBitSet.of(0, 2); + ImmutableBitSet longKey = ImmutableBitSet.of(0, 1, 2); + + Set<ImmutableBitSet> upsertKeys = new HashSet<>(); + upsertKeys.add(smallestKey); + upsertKeys.add(middleKey); + assertThat(UpsertKeyUtil.getSmallestKey(upsertKeys)).isEqualTo(smallestKey.toArray()); + + upsertKeys.clear(); + upsertKeys.add(smallestKey); + upsertKeys.add(longKey); + assertThat(UpsertKeyUtil.getSmallestKey(upsertKeys)).isEqualTo(smallestKey.toArray()); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java index 73c52d16e4d..5db0abf9cc8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java @@ -131,6 +131,10 @@ public class JavaUserDefinedScalarFunctions { return v + random.nextInt(); } + public String eval(String v) { + return v + "-" + random.nextInt(); + } + @Override public boolean isDeterministic() { return false; diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out index 1f4ca9d8dd7..46c9b032843 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out @@ -133,6 +133,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", + "inputUpsertKey" : [ 0, 1 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out index 81bd0cc7972..e29a2507619 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out @@ -121,6 +121,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>", + "inputUpsertKey" : [ 0, 1 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out index f2c4a58820d..0c219aae929 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out @@ -272,6 +272,7 @@ "priority" : 0 } ], "outputType" : "ROW<`order_id` BIGINT, `user` VARCHAR(2147483647), `product` VARCHAR(2147483647), `order_time` TIMESTAMP(3)>", + "inputUpsertKey" : [ 2 ], "description" : "Sink(table=[default_catalog.default_database.sink], fields=[order_id, user, product, order_time])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out index 3982472c711..8262813cced 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out @@ -365,6 +365,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL, `$f2` VARCHAR(2147483647)>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1, $f2])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out index 52aad805789..1af9ee2c41c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out @@ -297,6 +297,7 @@ "priority" : 0 } ], "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out index bfef96f23e1..12b2452fbef 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out @@ -545,6 +545,7 @@ "priority" : 0 } ], "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out index 4ca7313f0a2..f564a72dbad 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out @@ -240,6 +240,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, cnt_a, max_b, min_c])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out index b31d031ffd0..bfbee492d41 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out @@ -305,6 +305,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, cnt_a, max_b, min_c])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out index 1ce190c4f7c..41573678da6 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out @@ -233,6 +233,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, a1, a2, a3, c1])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out index 201e9547f11..36478bc1dd5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out @@ -249,6 +249,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, a1, a2, a3, c1])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out index d30a69a2d5f..ab47bb954c1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -466,6 +466,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT NULL, `EXPR$4` INT, `EXPR$5` BIGINT NOT NULL, `EXPR$6` VARCHAR(2147483647)>", + "inputUpsertKey" : [ 0, 1 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, EXPR$6])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out index bc5fd3fa7c0..52af443ff67 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out @@ -451,6 +451,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$2` BIGINT NOT NULL>", + "inputUpsertKey" : [ 0, 1 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_end, EXPR$2])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out index a2bcf705ab2..2f0585073c6 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out @@ -327,6 +327,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out index c4a0f219d8e..517dc8bc130 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out @@ -485,6 +485,7 @@ "priority" : 0 } ], "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, $f1, $f2, $f3])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out index 141b22ea160..a230c7d8695 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out @@ -242,6 +242,7 @@ "priority" : 0 } ], "outputType" : "ROW<`a1` INT, `b1` INT>", + "inputUpsertKey" : [ 0 ], "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a1, b1])" } ], "edges" : [ { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out new file mode 100644 index 00000000000..40bedb2ea92 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out @@ -0,0 +1,147 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`users`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "user_id", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "user_name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "email", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "balance", + "dataType" : "DECIMAL(18, 2)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_-147132882", + "type" : "PRIMARY_KEY", + "columns" : [ "user_id" ] + } + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "changelog-mode" : "I,UA,UB,D" + } + } + } + }, + "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `user_name` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, users]], fields=[user_id, user_name, email, balance])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647) NOT NULL" + }, { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`ndFunc`", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "DECIMAL(18, 2)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", + "description" : "Calc(select=[user_id, ndFunc(user_name) AS EXPR$1, email, balance])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "user_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "user_name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "email", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "balance", + "dataType" : "DECIMAL(18, 2)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_96619451", + "type" : "PRIMARY_KEY", + "columns" : [ "email" ] + } + }, + "partitionKeys" : [ ], + "options" : { + "sink-insert-only" : "false", + "connector" : "values" + } + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>", + "requireUpsertMaterialize" : true, + "inputUpsertKey" : [ 0 ], + "description" : "Sink(table=[default_catalog.default_database.sink], fields=[user_id, EXPR$1, email, balance], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala index 75b0d43706e..5dc7bc70e53 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.planner.runtime.stream.sql +import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils._ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row @@ -73,6 +74,21 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase | 'data-id' = '$peopleDataId' |) |""".stripMargin) + + val userDataId = TestValuesTableFactory.registerData(TestData.userChangelog) + tEnv.executeSql(s""" + |CREATE TABLE users ( + | user_id STRING, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | primary key (user_id) not enforced + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$userDataId', + | 'changelog-mode' = 'I,UA,UB,D' + |) + |""".stripMargin) } @Test @@ -157,6 +173,52 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase assertEquals(expected.sorted, result.sorted) } + @Test + def testChangelogSourceWithNonDeterministicFuncSinkWithDifferentPk(): Unit = { + tEnv.createTemporaryFunction("ndFunc", new TestNonDeterministicUdf) + tEnv.executeSql(""" + |CREATE TABLE sink_with_pk ( + | user_id STRING, + | user_name STRING, + | email STRING, + | balance DECIMAL(18,2), + | PRIMARY KEY(email) NOT ENFORCED + |) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + tEnv + .executeSql(s""" + |insert into sink_with_pk + |select user_id, SPLIT_INDEX(ndFunc(user_name), '-', 0), email, balance + |from users + |""".stripMargin) + .await() + + val result = TestValuesTableFactory.getResults("sink_with_pk") + val expected = List( + "+I[user1, Tom, [email protected], 8.10]", + "+I[user3, Bailey, [email protected], 9.99]", + "+I[user4, Tina, [email protected], 11.30]") + assertEquals(expected.sorted, result.sorted) + + val rawResult = TestValuesTableFactory.getRawResults("sink_with_pk") + val expectedRaw = List( + "+I[user1, Tom, [email protected], 10.02]", + "+I[user2, Jack, [email protected], 71.20]", + "-D[user1, Tom, [email protected], 10.02]", + "+I[user1, Tom, [email protected], 8.10]", + "+I[user3, Bailey, [email protected], 9.99]", + "-D[user2, Jack, [email protected], 71.20]", + "+I[user4, Tina, [email protected], 11.30]", + "-D[user3, Bailey, [email protected], 9.99]", + "+I[user3, Bailey, [email protected], 9.99]" + ) + assertEquals(expectedRaw, rawResult.toList) + } + @Test def testInsertPartColumn(): Unit = { tEnv.executeSql(""" diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java index 0d18cb4c540..8ba4b792e2b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java @@ -27,14 +27,19 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -66,29 +71,59 @@ public class SinkUpsertMaterializer extends TableStreamOperator<RowData> + "You can increase the state ttl to avoid this."; private final StateTtlConfig ttlConfig; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser; private final TypeSerializer<RowData> serializer; - private final GeneratedRecordEqualiser generatedEqualiser; + private final int[] inputUpsertKey; + private final boolean hasUpsertKey; + // The equaliser here may behaviors differently due to hasUpsertKey: + // if true: the equaliser only compares the upsertKey (a projected row data) + // if false: the equaliser compares the complete row private transient RecordEqualiser equaliser; + // Buffer of emitted insertions on which deletions will be applied first. // The row kind might be +I or +U and will be ignored when applying the deletion. private transient ValueState<List<RowData>> state; private transient TimestampedCollector<RowData> collector; + // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey. + private transient ProjectedRowData upsertKeyProjectedRow1; + private transient ProjectedRowData upsertKeyProjectedRow2; + public SinkUpsertMaterializer( StateTtlConfig ttlConfig, TypeSerializer<RowData> serializer, - GeneratedRecordEqualiser generatedEqualiser) { + GeneratedRecordEqualiser generatedRecordEqualiser, + @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser, + @Nullable int[] inputUpsertKey) { this.ttlConfig = ttlConfig; this.serializer = serializer; - this.generatedEqualiser = generatedEqualiser; + this.generatedRecordEqualiser = generatedRecordEqualiser; + this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser; + this.inputUpsertKey = inputUpsertKey; + this.hasUpsertKey = null != inputUpsertKey && inputUpsertKey.length > 0; + if (hasUpsertKey) { + Preconditions.checkNotNull( + generatedUpsertKeyEqualiser, + "GeneratedUpsertKeyEqualiser cannot be null when inputUpsertKey is not empty!"); + } } @Override public void open() throws Exception { super.open(); - this.equaliser = - generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + if (hasUpsertKey) { + this.equaliser = + generatedUpsertKeyEqualiser.newInstance( + getRuntimeContext().getUserCodeClassLoader()); + upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey); + upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey); + } else { + this.equaliser = + generatedRecordEqualiser.newInstance( + getRuntimeContext().getUserCodeClassLoader()); + } ValueStateDescriptor<List<RowData>> descriptor = new ValueStateDescriptor<>("values", new ListSerializer<>(serializer)); if (ttlConfig.isEnabled()) { @@ -109,31 +144,55 @@ public class SinkUpsertMaterializer extends TableStreamOperator<RowData> switch (row.getRowKind()) { case INSERT: case UPDATE_AFTER: - row.setRowKind(values.isEmpty() ? INSERT : UPDATE_AFTER); - values.add(row); - collector.collect(row); + addRow(values, row); break; case UPDATE_BEFORE: case DELETE: - final int lastIndex = values.size() - 1; - final int index = removeFirst(values, row); - if (index == -1) { - LOG.info(STATE_CLEARED_WARN_MSG); - return; - } - if (values.isEmpty()) { - // Delete this row - row.setRowKind(DELETE); - collector.collect(row); - } else if (index == lastIndex) { - // Last row has been removed, update to the second last one - final RowData latestRow = values.get(values.size() - 1); - latestRow.setRowKind(UPDATE_AFTER); - collector.collect(latestRow); - } + retractRow(values, row); break; } + } + + private void addRow(List<RowData> values, RowData add) throws IOException { + RowKind outRowKind = values.isEmpty() ? INSERT : UPDATE_AFTER; + if (hasUpsertKey) { + int index = findFirst(values, add); + if (index == -1) { + values.add(add); + } else { + values.set(index, add); + } + } else { + values.add(add); + } + add.setRowKind(outRowKind); + collector.collect(add); + + // Always need to sync with state + state.update(values); + } + + private void retractRow(List<RowData> values, RowData retract) throws IOException { + final int lastIndex = values.size() - 1; + final int index = findFirst(values, retract); + if (index == -1) { + LOG.info(STATE_CLEARED_WARN_MSG); + return; + } else { + // Remove first found row + values.remove(index); + } + if (values.isEmpty()) { + // Delete this row + retract.setRowKind(DELETE); + collector.collect(retract); + } else if (index == lastIndex) { + // Last row has been removed, update to the second last one + final RowData latestRow = values.get(values.size() - 1); + latestRow.setRowKind(UPDATE_AFTER); + collector.collect(latestRow); + } if (values.isEmpty()) { state.clear(); @@ -142,19 +201,25 @@ public class SinkUpsertMaterializer extends TableStreamOperator<RowData> } } - private int removeFirst(List<RowData> values, RowData remove) { + private int findFirst(List<RowData> values, RowData target) { final Iterator<RowData> iterator = values.iterator(); int i = 0; while (iterator.hasNext()) { - final RowData row = iterator.next(); - // Ignore kind during comparison - remove.setRowKind(row.getRowKind()); - if (equaliser.equals(row, remove)) { - iterator.remove(); + if (equalsIgnoreRowKind(target, iterator.next())) { return i; } i++; } return -1; } + + private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) { + newRow.setRowKind(oldRow.getRowKind()); + if (hasUpsertKey) { + return equaliser.equals( + upsertKeyProjectedRow1.replaceRow(newRow), + upsertKeyProjectedRow2.replaceRow(oldRow)); + } + return equaliser.equals(newRow, oldRow); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index cdc6299a839..671b2ec07d6 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.VarCharType; @@ -43,16 +44,19 @@ import java.util.List; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link SinkUpsertMaterializer}. */ public class SinkUpsertMaterializerTest { private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000); - private final LogicalType[] types = new LogicalType[] {new IntType(), new VarCharType()}; + private final LogicalType[] types = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; private final RowDataSerializer serializer = new RowDataSerializer(types); private final RowDataKeySelector keySelector = - HandwrittenSelectorUtil.getRowDataSelector(new int[0], types); + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types); + private final GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]) { @@ -62,10 +66,20 @@ public class SinkUpsertMaterializerTest { } }; + private final GeneratedRecordEqualiser upsertKeyEqualiser = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + @Test public void test() throws Exception { SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer(ttlConfig, serializer, equaliser); + new SinkUpsertMaterializer( + ttlConfig, serializer, equaliser, upsertKeyEqualiser, null); KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( materializer, keySelector, keySelector.getProducedType()); @@ -74,30 +88,69 @@ public class SinkUpsertMaterializerTest { testHarness.setStateTtlProcessingTime(1); - testHarness.processElement(insertRecord(1, "a1")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, "a1")); + testHarness.processElement(insertRecord(1L, 1, "a1")); + shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); - testHarness.processElement(insertRecord(1, "a2")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a2")); + testHarness.processElement(insertRecord(2L, 1, "a2")); + shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); - testHarness.processElement(insertRecord(1, "a3")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a3")); + testHarness.processElement(insertRecord(3L, 1, "a3")); + shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); - testHarness.processElement(deleteRecord(1, "a2")); + testHarness.processElement(deleteRecord(2L, 1, "a2")); shouldEmitNothing(testHarness); - testHarness.processElement(deleteRecord(1, "a3")); - shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a1")); + testHarness.processElement(deleteRecord(3L, 1, "a3")); + shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); - testHarness.processElement(deleteRecord(1, "a1")); - shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, "a1")); + testHarness.processElement(deleteRecord(1L, 1, "a1")); + shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); - testHarness.processElement(insertRecord(1, "a4")); - shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, "a4")); + testHarness.processElement(insertRecord(4L, 1, "a4")); + shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); - testHarness.processElement(deleteRecord(1, "a4")); + testHarness.processElement(deleteRecord(4L, 1, "a4")); + shouldEmitNothing(testHarness); + + testHarness.close(); + } + + @Test + public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { + SinkUpsertMaterializer materializer = + new SinkUpsertMaterializer( + ttlConfig, serializer, equaliser, upsertKeyEqualiser, new int[] {0}); + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + materializer, keySelector, keySelector.getProducedType()); + + testHarness.open(); + + testHarness.setStateTtlProcessingTime(1); + + testHarness.processElement(insertRecord(1L, 1, "a1")); + shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + testHarness.processElement(updateAfterRecord(1L, 1, "a11")); + shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + + testHarness.processElement(insertRecord(3L, 1, "a3")); + shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + testHarness.processElement(deleteRecord(1L, 1, "a111")); + shouldEmitNothing(testHarness); + + testHarness.processElement(deleteRecord(3L, 1, "a33")); + shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + + testHarness.processElement(insertRecord(4L, 1, "a4")); + shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + testHarness.setStateTtlProcessingTime(1002); + + testHarness.processElement(deleteRecord(4L, 1, "a4")); shouldEmitNothing(testHarness); testHarness.close(); @@ -118,7 +171,8 @@ public class SinkUpsertMaterializerTest { Object o; while ((o = harness.getOutput().poll()) != null) { RowData value = (RowData) ((StreamRecord<?>) o).getValue(); - GenericRowData newRow = GenericRowData.of(value.getInt(0), value.getString(1)); + GenericRowData newRow = + GenericRowData.of(value.getLong(0), value.getInt(1), value.getString(2)); newRow.setRowKind(value.getRowKind()); rows.add(newRow); } @@ -129,8 +183,16 @@ public class SinkUpsertMaterializerTest { @Override public boolean equals(RowData row1, RowData row2) { return row1.getRowKind() == row2.getRowKind() - && row1.getInt(0) == row2.getInt(0) - && row1.getString(1).equals(row2.getString(1)); + && row1.getLong(0) == row2.getLong(0) + && row1.getInt(1) == row2.getInt(1) + && row1.getString(2).equals(row2.getString(2)); + } + } + + private static class TestUpsertKeyEqualiser implements RecordEqualiser { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); } } }
