This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5fdc01e3a58540d4bc89c9f291cafd31d24f37d3 Author: JingsongLi <[email protected]> AuthorDate: Wed Jun 23 18:00:44 2021 +0800 [FLINK-23054][table] Add SinkUpsertMaterialize before upsert sink to resolve change log disorder This closes #16239 --- .../generated/execution_config_configuration.html | 6 + .../table/api/config/ExecutionConfigOptions.java | 32 +++++ .../plan/nodes/exec/batch/BatchExecSink.java | 2 +- .../plan/nodes/exec/common/CommonExecSink.java | 51 +++++-- .../plan/nodes/exec/stream/StreamExecSink.java | 16 ++- .../nodes/physical/stream/StreamPhysicalSink.scala | 60 +++++++- .../planner/plan/stream/sql/TableSinkTest.xml | 74 +++++++++- .../planner/plan/stream/sql/TableSinkTest.scala | 66 +++++++++ .../runtime/stream/sql/TableSinkITCase.scala | 157 +++++++++++++++++++++ .../operators/sink/SinkUpsertMaterializer.java | 151 ++++++++++++++++++++ .../operators/sink/SinkUpsertMaterializerTest.java | 127 +++++++++++++++++ 11 files changed, 725 insertions(+), 17 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 8ef9c6f..0f44146 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -65,6 +65,12 @@ By default no operator is disabled.</td> <td>The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception.</td> </tr> <tr> + <td><h5>table.exec.sink.upsert-materialize</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">AUTO</td> + <td><p>Enum</p>Possible values: [NONE, AUTO, FORCE]</td> + <td>Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.<br />By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).</td> + </tr> + <tr> <td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td> <td style="word-wrap: break-word;">true</td> <td>Boolean</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index d74e9ec..cb0299f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -122,6 +122,25 @@ public class ExecutionConfigOptions { + "into NOT NULL columns. Users can change the behavior to 'drop' to " + "silently drop such records without throwing exception."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE = + key("table.exec.sink.upsert-materialize") + .enumType(UpsertMaterialize.class) + .defaultValue(UpsertMaterialize.AUTO) + .withDescription( + Description.builder() + .text( + "Because of the disorder of ChangeLog data caused by Shuffle in distributed system, " + + "the data received by Sink may not be the order of global upsert. " + + "So add upsert materialize operator before upsert sink. It receives the " + + "upstream changelog records and generate an upsert view for the downstream.") + .linebreak() + .text( + "By default, the materialize operator will be added when a distributed disorder " + + "occurs on unique keys. You can also choose no materialization(NONE) " + + "or force materialization(FORCE).") + .build()); + // ------------------------------------------------------------------------ // Sort Options // ------------------------------------------------------------------------ @@ -365,4 +384,17 @@ public class ExecutionConfigOptions { /** Drop records when writing null values into NOT NULL column. */ DROP } + + /** Upsert materialize strategy before sink. */ + public enum UpsertMaterialize { + + /** In no case will materialize operator be added. */ + NONE, + + /** Add materialize operator when a distributed disorder occurs on unique keys. */ + AUTO, + + /** Add materialize operator in any case. */ + FORCE + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index c643bc7..ae9b703 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -57,6 +57,6 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec final Transformation<RowData> inputTransform = (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner); return createSinkTransformation( - planner.getExecEnv(), planner.getTableConfig(), inputTransform, -1); + planner.getExecEnv(), planner.getTableConfig(), inputTransform, -1, false); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 7a0c3a3..545bf96 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -46,6 +46,7 @@ import org.apache.flink.table.connector.sink.OutputFormatProvider; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.connectors.TransformationSinkProvider; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; @@ -54,10 +55,13 @@ import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTran import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer; import org.apache.flink.table.runtime.operators.sink.SinkOperator; +import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -112,7 +116,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> StreamExecutionEnvironment env, TableConfig tableConfig, Transformation<RowData> inputTransform, - int rowtimeFieldIndex) { + int rowtimeFieldIndex, + boolean upsertMaterialize) { final DynamicTableSink tableSink = tableSinkSpec.getTableSink(); final DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)); @@ -149,12 +154,40 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> // apply keyBy partition transformation if needed inputTransform = - applyKeyByForDifferentParallelism( + applyKeyByIfNeeded( physicalRowType, schema.getPrimaryKey().orElse(null), inputTransform, inputParallelism, - sinkParallelism); + sinkParallelism, + upsertMaterialize); + + if (upsertMaterialize) { + GeneratedRecordEqualiser equaliser = + new EqualiserCodeGenerator(physicalRowType) + .generateRecordEqualiser("SinkMaterializeEqualiser"); + SinkUpsertMaterializer operator = + new SinkUpsertMaterializer( + StateConfigUtil.createTtlConfig( + tableConfig.getIdleStateRetention().toMillis()), + InternalTypeInfo.of(physicalRowType).toSerializer(), + equaliser); + OneInputTransformation<RowData, RowData> materializeTransform = + new OneInputTransformation<>( + inputTransform, + "SinkMaterializer", + operator, + inputTransform.getOutputType(), + sinkParallelism); + int[] pkIndices = + getPrimaryKeyIndices(physicalRowType, schema.getPrimaryKey().get()); + RowDataKeySelector keySelector = + KeySelectorUtil.getRowDataSelector( + pkIndices, InternalTypeInfo.of(physicalRowType)); + materializeTransform.setStateKeySelector(keySelector); + materializeTransform.setStateKeyType(keySelector.getProducedType()); + inputTransform = materializeTransform; + } final SinkFunction<RowData> sinkFunction; if (runtimeProvider instanceof SinkFunctionProvider) { @@ -248,17 +281,19 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> /** * Apply a keyBy partition transformation if the parallelism of sink operator and input operator - * is different and sink changelog-mode is not insert-only. This is used to guarantee the strict - * ordering of changelog messages. + * is different and sink changelog-mode is not insert-only or requireMaterialize. This is used + * to guarantee the strict ordering of changelog messages. */ - private Transformation<RowData> applyKeyByForDifferentParallelism( + private Transformation<RowData> applyKeyByIfNeeded( RowType sinkRowType, @Nullable UniqueConstraint primaryKey, Transformation<RowData> inputTransform, int inputParallelism, - int sinkParallelism) { + int sinkParallelism, + boolean upsertMaterialize) { final int[] primaryKeys = getPrimaryKeyIndices(sinkRowType, primaryKey); - if (inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) { + if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) + && !upsertMaterialize) { // if the inputParallelism is equals to the parallelism or insert-only mode, do nothing. return inputTransform; } else if (primaryKeys.length == 0) { diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index b26e89b..f57eacf 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -54,17 +55,23 @@ import java.util.stream.Collectors; public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> { public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode"; + public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize"; @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE) @JsonSerialize(using = ChangelogModeJsonSerializer.class) @JsonDeserialize(using = ChangelogModeJsonDeserializer.class) private final ChangelogMode inputChangelogMode; + @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final boolean upsertMaterialize; + public StreamExecSink( DynamicTableSinkSpec tableSinkSpec, ChangelogMode inputChangelogMode, InputProperty inputProperty, LogicalType outputType, + boolean upsertMaterialize, String description) { super( tableSinkSpec, @@ -75,6 +82,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj outputType, description); this.inputChangelogMode = inputChangelogMode; + this.upsertMaterialize = upsertMaterialize; } @JsonCreator @@ -84,6 +92,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj @JsonProperty(FIELD_NAME_ID) int id, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType, + @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean upsertMaterialize, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { super( tableSinkSpec, @@ -94,6 +103,7 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj outputType, description); this.inputChangelogMode = inputChangelogMode; + this.upsertMaterialize = upsertMaterialize; } @SuppressWarnings("unchecked") @@ -128,6 +138,10 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj } return createSinkTransformation( - planner.getExecEnv(), planner.getTableConfig(), inputTransform, rowtimeFieldIndex); + planner.getExecEnv(), + planner.getTableConfig(), + inputTransform, + rowtimeFieldIndex, + upsertMaterialize); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala index fda14e9..58d540e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala @@ -18,22 +18,31 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream +import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable} import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.calcite.Sink import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} -import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRelOptUtil} +import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRelOptUtil, RelDescriptionWriterImpl} +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala +import org.apache.flink.types.RowKind import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.hint.RelHint +import org.apache.calcite.util.ImmutableBitSet +import java.io.{PrintWriter, StringWriter} import java.util +import scala.collection.JavaConversions._ + /** * Stream physical RelNode to to write data into an external sink defined by a * [[DynamicTableSink]]. @@ -75,12 +84,59 @@ class StreamPhysicalSink( val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this) tableSinkSpec.setReadableConfig(tableConfig.getConfiguration) + val primaryKeys = toScala(catalogTable.getResolvedSchema + .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq()) + + val upsertMaterialize = tableConfig.getConfiguration.get( + ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match { + case UpsertMaterialize.FORCE => primaryKeys.nonEmpty + case UpsertMaterialize.NONE => false + case UpsertMaterialize.AUTO => + val insertOnly = tableSink + .getChangelogMode(inputChangelogMode) + .containsOnly(RowKind.INSERT) + + if (!insertOnly && primaryKeys.nonEmpty) { + val columnNames = catalogTable.getResolvedSchema.getColumnNames + val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): _*) + + val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery) + val uniqueKeys = fmq.getUniqueKeys(getInput) + val changeLogUpsertKeys = fmq.getUpsertKeys(getInput) + + if (uniqueKeys != null && + uniqueKeys.exists(pks.contains) && + !(changeLogUpsertKeys != null && + changeLogUpsertKeys.exists(pks.contains))) { + true + } else { + false + } + } else { + false + } + } + new StreamExecSink( tableSinkSpec, inputChangelogMode, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), - getRelDetailedDescription + upsertMaterialize, + getDescriptionWithUpsert(upsertMaterialize) ) } + + /** + * The inputChangelogMode can only be obtained in translateToExecNode phase. + */ + def getDescriptionWithUpsert(upsertMaterialize: Boolean): String = { + val sw = new StringWriter + val pw = new PrintWriter(sw) + val relWriter = new RelDescriptionWriterImpl(pw) + this.explainTerms(relWriter) + relWriter.itemIf("upsertMaterialize", "true", upsertMaterialize) + relWriter.done(this) + sw.toString + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml index b0ec026..28c1048 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml @@ -105,11 +105,6 @@ Sink(table=[default_catalog.default_database.upsertSink], fields=[a, total_min], ]]> </Resource> </TestCase> - <TestCase name="testInsertMismatchTypeForEmptyChar"> - <Resource name="sql"> - <![CDATA[INSERT INTO my_sink SELECT a, '', '' FROM MyTable]]> - </Resource> - </TestCase> <TestCase name="testMetadataColumn"> <Resource name="ast"> <![CDATA[ @@ -240,6 +235,41 @@ Sink(table=[default_catalog.default_database.retractSink], fields=[cnt, a], chan ]]> </Resource> </TestCase> + <TestCase name="testSinkDisorderChangeLogWithRank"> + <Resource name="sql"> + <![CDATA[ +INSERT INTO SinkRankChangeLog +SELECT person, sum_votes FROM + (SELECT person, sum_votes, + ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number + FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src + GROUP BY person)) + WHERE rank_number < 10 +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.SinkRankChangeLog], fields=[person, sum_votes]) ++- LogicalProject(person=[$0], sum_votes=[$1]) + +- LogicalFilter(condition=[<($2, 10)]) + +- LogicalProject(person=[$0], sum_votes=[$1], rank_number=[ROW_NUMBER() OVER (PARTITION BY /($1, 2) ORDER BY $1 DESC NULLS LAST)]) + +- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.SinkRankChangeLog], fields=[person, sum_votes], upsertMaterialize=[true]) ++- Calc(select=[person, sum_votes]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[$2], orderBy=[sum_votes DESC], select=[person, sum_votes, $2]) + +- Exchange(distribution=[hash[$2]]) + +- Calc(select=[person, sum_votes, (sum_votes / 2) AS $2]) + +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) AS sum_votes]) + +- Exchange(distribution=[hash[person]]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[person, votes]) +]]> + </Resource> + </TestCase> <TestCase name="testUpsertSinkWithFilter"> <Resource name="ast"> <![CDATA[ @@ -262,4 +292,38 @@ Sink(table=[default_catalog.default_database.upsertSink], fields=[a, cnt], chang ]]> </Resource> </TestCase> + <TestCase name="testSinkDisorderChangeLogWithJoin"> + <Resource name="sql"> + <![CDATA[ +INSERT INTO SinkJoinChangeLog +SELECT T.person, T.sum_votes, award.prize FROM + (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award + WHERE T.sum_votes = award.votes +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person, sum_votes, prize]) ++- LogicalProject(person=[$0], sum_votes=[$1], prize=[$3]) + +- LogicalFilter(condition=[=($1, $2)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, src]]) + +- LogicalTableScan(table=[[default_catalog, default_database, award]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.SinkJoinChangeLog], fields=[person, sum_votes, prize], upsertMaterialize=[true]) ++- Calc(select=[person, sum_votes, prize]) + +- Join(joinType=[InnerJoin], where=[(sum_votes = votes)], select=[person, sum_votes, votes, prize], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) + :- Exchange(distribution=[hash[sum_votes]]) + : +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) AS sum_votes]) + : +- Exchange(distribution=[hash[person]]) + : +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[person, votes]) + +- Exchange(distribution=[hash[votes]]) + +- TableSourceScan(table=[[default_catalog, default_database, award]], fields=[votes, prize]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala index 3c02bed..461c4fc 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala @@ -29,6 +29,27 @@ class TableSinkTest extends TableTestBase { private val util = streamTestUtil() util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + util.tableEnv.executeSql( + """ + |CREATE TABLE src (person String, votes BIGINT) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.tableEnv.executeSql( + """ + |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.tableEnv.executeSql( + """ + |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + @Test def testInsertMismatchTypeForEmptyChar(): Unit = { util.addTable( @@ -398,4 +419,49 @@ class TableSinkTest extends TableTestBase { util.verifyRelPlan(stmtSet) } + + @Test + def testSinkDisorderChangeLogWithJoin(): Unit = { + util.tableEnv.executeSql( + """ + |CREATE TABLE SinkJoinChangeLog ( + | person STRING, votes BIGINT, prize DOUBLE, + | PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + util.verifyExecPlanInsert( + """ + |INSERT INTO SinkJoinChangeLog + |SELECT T.person, T.sum_votes, award.prize FROM + | (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award + | WHERE T.sum_votes = award.votes + |""".stripMargin) + } + + @Test + def testSinkDisorderChangeLogWithRank(): Unit = { + util.tableEnv.executeSql( + """ + |CREATE TABLE SinkRankChangeLog ( + | person STRING, votes BIGINT, + | PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + util.verifyExecPlanInsert( + """ + |INSERT INTO SinkRankChangeLog + |SELECT person, sum_votes FROM + | (SELECT person, sum_votes, + | ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number + | FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src + | GROUP BY person)) + | WHERE rank_number < 10 + |""".stripMargin) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala new file mode 100644 index 0000000..20b61a3 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql + +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.planner.runtime.utils._ + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConversions._ + +@RunWith(classOf[Parameterized]) +class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { + + override def before(): Unit = { + super.before() + + val srcDataId = TestValuesTableFactory.registerData(Seq( + row("jason", 1L), + row("jason", 1L), + row("jason", 1L), + row("jason", 1L) + )) + tEnv.executeSql( + s""" + |CREATE TABLE src (person String, votes BIGINT) WITH( + | 'connector' = 'values', + | 'data-id' = '$srcDataId' + |) + |""".stripMargin) + + val awardDataId = TestValuesTableFactory.registerData(Seq( + row(1L, 5.2D), + row(2L, 12.1D), + row(3L, 18.3D), + row(4L, 22.5D) + )) + tEnv.executeSql( + s""" + |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'data-id' = '$awardDataId' + |) + |""".stripMargin) + + val peopleDataId = TestValuesTableFactory.registerData(Seq(row("jason", 22))) + tEnv.executeSql( + s""" + |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'data-id' = '$peopleDataId' + |) + |""".stripMargin) + } + + @Test + def testJoinDisorderChangeLog(): Unit = { + tEnv.executeSql( + """ + |CREATE TABLE JoinDisorderChangeLog ( + | person STRING, votes BIGINT, prize DOUBLE, age INT, + | PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + tEnv.executeSql( + """ + |INSERT INTO JoinDisorderChangeLog + |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM + | (SELECT T.person, T.sum_votes, award.prize FROM + | (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, + | award + | WHERE T.sum_votes = award.votes) T1, people T2 + | WHERE T1.person = T2.person + |""".stripMargin).await() + + val result = TestValuesTableFactory.getResults("JoinDisorderChangeLog") + val expected = List("+I[jason, 4, 22.5, 22]") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testSinkDisorderChangeLog(): Unit = { + tEnv.executeSql( + """ + |CREATE TABLE SinkDisorderChangeLog ( + | person STRING, votes BIGINT, prize DOUBLE, + | PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + tEnv.executeSql( + """ + |INSERT INTO SinkDisorderChangeLog + |SELECT T.person, T.sum_votes, award.prize FROM + | (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award + | WHERE T.sum_votes = award.votes + |""".stripMargin).await() + + val result = TestValuesTableFactory.getResults("SinkDisorderChangeLog") + val expected = List("+I[jason, 4, 22.5]") + assertEquals(expected.sorted, result.sorted) + } + + @Test + def testSinkDisorderChangeLogWithRank(): Unit = { + tEnv.executeSql( + """ + |CREATE TABLE SinkRankChangeLog ( + | person STRING, votes BIGINT, + | PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + tEnv.executeSql( + """ + |INSERT INTO SinkRankChangeLog + |SELECT person, sum_votes FROM + | (SELECT person, sum_votes, + | ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number + | FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src + | GROUP BY person)) + | WHERE rank_number < 10 + |""".stripMargin).await() + + val result = TestValuesTableFactory.getResults("SinkRankChangeLog") + val expected = List("+I[jason, 4]") + assertEquals(expected.sorted, result.sorted) + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java new file mode 100644 index 0000000..2c06e67 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A operator that maintains the records corresponding to the upsert keys in the state, it receives + * the upstream changelog records and generate an upsert view for the downstream. + * + * <ul> + * <li>For insert record, append the state and collect current record. + * <li>For delete record, delete in the state, collect delete record when the state is empty. + * <li>For delete record, delete in the state, collect the last one when the state is not empty. + * </ul> + */ +public class SinkUpsertMaterializer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(SinkUpsertMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state ttl. This will result in incorrect result. " + + "You can increase the state ttl to avoid this."; + + private final StateTtlConfig ttlConfig; + private final TypeSerializer<RowData> serializer; + private final GeneratedRecordEqualiser generatedEqualiser; + + private transient RecordEqualiser equaliser; + private transient ValueState<List<RowData>> state; + private transient TimestampedCollector<RowData> collector; + + public SinkUpsertMaterializer( + StateTtlConfig ttlConfig, + TypeSerializer<RowData> serializer, + GeneratedRecordEqualiser generatedEqualiser) { + this.ttlConfig = ttlConfig; + this.serializer = serializer; + this.generatedEqualiser = generatedEqualiser; + } + + @Override + public void open() throws Exception { + super.open(); + this.equaliser = + generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + ValueStateDescriptor<List<RowData>> descriptor = + new ValueStateDescriptor<>("values", new ListSerializer<>(serializer)); + if (ttlConfig.isEnabled()) { + descriptor.enableTimeToLive(ttlConfig); + } + this.state = getRuntimeContext().getState(descriptor); + this.collector = new TimestampedCollector<>(output); + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + RowData row = element.getValue(); + boolean isInsertOp = row.getRowKind() == INSERT || row.getRowKind() == UPDATE_AFTER; + // Always set the RowKind to INSERT, so that we can compare rows correctly (RowKind will + // be ignored) + row.setRowKind(INSERT); + List<RowData> values = state.value(); + if (values == null) { + values = new ArrayList<>(2); + } + + if (isInsertOp) { + values.add(row); + // Update to this new one + collector.collect(row); + } else { + int lastIndex = values.size() - 1; + int index = removeFirst(values, row); + if (index == -1) { + LOG.info(STATE_CLEARED_WARN_MSG); + return; + } + if (values.isEmpty()) { + // Delete this row + row.setRowKind(DELETE); + collector.collect(row); + } else if (index == lastIndex) { + // Last one removed + // Update to newer + collector.collect(values.get(values.size() - 1)); + } + } + + if (values.isEmpty()) { + state.clear(); + } else { + state.update(values); + } + } + + private int removeFirst(List<RowData> values, RowData remove) { + Iterator<RowData> iterator = values.iterator(); + int i = 0; + while (iterator.hasNext()) { + RowData row = iterator.next(); + if (equaliser.equals(row, remove)) { + iterator.remove(); + return i; + } + i++; + } + return -1; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java new file mode 100644 index 0000000..3b246be --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; + +/** Test for {@link SinkUpsertMaterializer}. */ +public class SinkUpsertMaterializerTest { + + private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000); + private final LogicalType[] types = new LogicalType[] {new IntType(), new VarCharType()}; + private final RowDataSerializer serializer = new RowDataSerializer(types); + private final RowDataKeySelector keySelector = + HandwrittenSelectorUtil.getRowDataSelector(new int[0], types); + private final GeneratedRecordEqualiser equaliser = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + @Test + public void test() throws Exception { + SinkUpsertMaterializer materializer = + new SinkUpsertMaterializer(ttlConfig, serializer, equaliser); + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + materializer, keySelector, keySelector.getProducedType()); + + testHarness.open(); + + testHarness.setStateTtlProcessingTime(1); + + testHarness.processElement(insertRecord(1, "a1")); + Assert.assertEquals(Collections.singletonList(row(1, "a1")), toRows(testHarness)); + + testHarness.processElement(insertRecord(1, "a2")); + Assert.assertEquals(Collections.singletonList(row(1, "a2")), toRows(testHarness)); + + testHarness.processElement(insertRecord(1, "a3")); + Assert.assertEquals(Collections.singletonList(row(1, "a3")), toRows(testHarness)); + + testHarness.processElement(deleteRecord(1, "a2")); + Assert.assertEquals(Collections.emptyList(), toRows(testHarness)); + + testHarness.processElement(deleteRecord(1, "a3")); + Assert.assertEquals(Collections.singletonList(row(1, "a1")), toRows(testHarness)); + + testHarness.processElement(deleteRecord(1, "a1")); + RowData deleteRow = row(1, "a1"); + deleteRow.setRowKind(RowKind.DELETE); + Assert.assertEquals(Collections.singletonList(deleteRow), toRows(testHarness)); + + testHarness.processElement(insertRecord(1, "a4")); + Assert.assertEquals(Collections.singletonList(row(1, "a4")), toRows(testHarness)); + + testHarness.setStateTtlProcessingTime(1002); + + testHarness.processElement(deleteRecord(1, "a4")); + Assert.assertEquals(Collections.emptyList(), toRows(testHarness)); + + testHarness.close(); + } + + private List<RowData> toRows(OneInputStreamOperatorTestHarness<RowData, RowData> harness) { + Object o; + List<RowData> ret = new ArrayList<>(); + while ((o = harness.getOutput().poll()) != null) { + RowData value = (RowData) ((StreamRecord) o).getValue(); + GenericRowData newRow = GenericRowData.of(value.getInt(0), value.getString(1)); + newRow.setRowKind(value.getRowKind()); + ret.add(newRow); + } + return ret; + } + + private static class TestRecordEqualiser implements RecordEqualiser { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getInt(0) == row2.getInt(0) && row1.getString(1).equals(row2.getString(1)); + } + } +}
