This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5fdc01e3a58540d4bc89c9f291cafd31d24f37d3
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jun 23 18:00:44 2021 +0800

    [FLINK-23054][table] Add SinkUpsertMaterialize before upsert sink to 
resolve change log disorder
    
    This closes #16239
---
 .../generated/execution_config_configuration.html  |   6 +
 .../table/api/config/ExecutionConfigOptions.java   |  32 +++++
 .../plan/nodes/exec/batch/BatchExecSink.java       |   2 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  51 +++++--
 .../plan/nodes/exec/stream/StreamExecSink.java     |  16 ++-
 .../nodes/physical/stream/StreamPhysicalSink.scala |  60 +++++++-
 .../planner/plan/stream/sql/TableSinkTest.xml      |  74 +++++++++-
 .../planner/plan/stream/sql/TableSinkTest.scala    |  66 +++++++++
 .../runtime/stream/sql/TableSinkITCase.scala       | 157 +++++++++++++++++++++
 .../operators/sink/SinkUpsertMaterializer.java     | 151 ++++++++++++++++++++
 .../operators/sink/SinkUpsertMaterializerTest.java | 127 +++++++++++++++++
 11 files changed, 725 insertions(+), 17 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 8ef9c6f..0f44146 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -65,6 +65,12 @@ By default no operator is disabled.</td>
             <td>The NOT NULL column constraint on a table enforces that null 
values can't be inserted into the table. Flink supports 'error' (default) and 
'drop' enforcement behavior. By default, Flink will check values and throw 
runtime exception when null values writing into NOT NULL columns. Users can 
change the behavior to 'drop' to silently drop such records without throwing 
exception.</td>
         </tr>
         <tr>
+            <td><h5>table.exec.sink.upsert-materialize</h5><br> <span 
class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">AUTO</td>
+            <td><p>Enum</p>Possible values: [NONE, AUTO, FORCE]</td>
+            <td>Because of the disorder of ChangeLog data caused by Shuffle in 
distributed system, the data received by Sink may not be the order of global 
upsert. So add upsert materialize operator before upsert sink. It receives the 
upstream changelog records and generate an upsert view for the downstream.<br 
/>By default, the materialize operator will be added when a distributed 
disorder occurs on unique keys. You can also choose no materialization(NONE) or 
force materialization(FORCE).</td>
+        </tr>
+        <tr>
             <td><h5>table.exec.sort.async-merge-enabled</h5><br> <span 
class="label label-primary">Batch</span></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index d74e9ec..cb0299f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -122,6 +122,25 @@ public class ExecutionConfigOptions {
                                     + "into NOT NULL columns. Users can change 
the behavior to 'drop' to "
                                     + "silently drop such records without 
throwing exception.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<UpsertMaterialize> 
TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
+            key("table.exec.sink.upsert-materialize")
+                    .enumType(UpsertMaterialize.class)
+                    .defaultValue(UpsertMaterialize.AUTO)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Because of the disorder of 
ChangeLog data caused by Shuffle in distributed system, "
+                                                    + "the data received by 
Sink may not be the order of global upsert. "
+                                                    + "So add upsert 
materialize operator before upsert sink. It receives the "
+                                                    + "upstream changelog 
records and generate an upsert view for the downstream.")
+                                    .linebreak()
+                                    .text(
+                                            "By default, the materialize 
operator will be added when a distributed disorder "
+                                                    + "occurs on unique keys. 
You can also choose no materialization(NONE) "
+                                                    + "or force 
materialization(FORCE).")
+                                    .build());
+
     // ------------------------------------------------------------------------
     //  Sort Options
     // ------------------------------------------------------------------------
@@ -365,4 +384,17 @@ public class ExecutionConfigOptions {
         /** Drop records when writing null values into NOT NULL column. */
         DROP
     }
+
+    /** Upsert materialize strategy before sink. */
+    public enum UpsertMaterialize {
+
+        /** In no case will materialize operator be added. */
+        NONE,
+
+        /** Add materialize operator when a distributed disorder occurs on 
unique keys. */
+        AUTO,
+
+        /** Add materialize operator in any case. */
+        FORCE
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c643bc7..ae9b703 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -57,6 +57,6 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) 
getInputEdges().get(0).translateToPlan(planner);
         return createSinkTransformation(
-                planner.getExecEnv(), planner.getTableConfig(), 
inputTransform, -1);
+                planner.getExecEnv(), planner.getTableConfig(), 
inputTransform, -1, false);
     }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 7a0c3a3..545bf96 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
@@ -54,10 +55,13 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTran
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
 import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -112,7 +116,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             StreamExecutionEnvironment env,
             TableConfig tableConfig,
             Transformation<RowData> inputTransform,
-            int rowtimeFieldIndex) {
+            int rowtimeFieldIndex,
+            boolean upsertMaterialize) {
         final DynamicTableSink tableSink = tableSinkSpec.getTableSink();
         final DynamicTableSink.SinkRuntimeProvider runtimeProvider =
                 tableSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(isBounded));
@@ -149,12 +154,40 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
 
             // apply keyBy partition transformation if needed
             inputTransform =
-                    applyKeyByForDifferentParallelism(
+                    applyKeyByIfNeeded(
                             physicalRowType,
                             schema.getPrimaryKey().orElse(null),
                             inputTransform,
                             inputParallelism,
-                            sinkParallelism);
+                            sinkParallelism,
+                            upsertMaterialize);
+
+            if (upsertMaterialize) {
+                GeneratedRecordEqualiser equaliser =
+                        new EqualiserCodeGenerator(physicalRowType)
+                                
.generateRecordEqualiser("SinkMaterializeEqualiser");
+                SinkUpsertMaterializer operator =
+                        new SinkUpsertMaterializer(
+                                StateConfigUtil.createTtlConfig(
+                                        
tableConfig.getIdleStateRetention().toMillis()),
+                                
InternalTypeInfo.of(physicalRowType).toSerializer(),
+                                equaliser);
+                OneInputTransformation<RowData, RowData> materializeTransform =
+                        new OneInputTransformation<>(
+                                inputTransform,
+                                "SinkMaterializer",
+                                operator,
+                                inputTransform.getOutputType(),
+                                sinkParallelism);
+                int[] pkIndices =
+                        getPrimaryKeyIndices(physicalRowType, 
schema.getPrimaryKey().get());
+                RowDataKeySelector keySelector =
+                        KeySelectorUtil.getRowDataSelector(
+                                pkIndices, 
InternalTypeInfo.of(physicalRowType));
+                materializeTransform.setStateKeySelector(keySelector);
+                
materializeTransform.setStateKeyType(keySelector.getProducedType());
+                inputTransform = materializeTransform;
+            }
 
             final SinkFunction<RowData> sinkFunction;
             if (runtimeProvider instanceof SinkFunctionProvider) {
@@ -248,17 +281,19 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
 
     /**
      * Apply a keyBy partition transformation if the parallelism of sink 
operator and input operator
-     * is different and sink changelog-mode is not insert-only. This is used 
to guarantee the strict
-     * ordering of changelog messages.
+     * is different and sink changelog-mode is not insert-only or 
requireMaterialize. This is used
+     * to guarantee the strict ordering of changelog messages.
      */
-    private Transformation<RowData> applyKeyByForDifferentParallelism(
+    private Transformation<RowData> applyKeyByIfNeeded(
             RowType sinkRowType,
             @Nullable UniqueConstraint primaryKey,
             Transformation<RowData> inputTransform,
             int inputParallelism,
-            int sinkParallelism) {
+            int sinkParallelism,
+            boolean upsertMaterialize) {
         final int[] primaryKeys = getPrimaryKeyIndices(sinkRowType, 
primaryKey);
-        if (inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT)) {
+        if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
+                && !upsertMaterialize) {
             // if the inputParallelism is equals to the parallelism or 
insert-only mode, do nothing.
             return inputTransform;
         } else if (primaryKeys.length == 0) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index b26e89b..f57eacf 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -54,17 +55,23 @@ import java.util.stream.Collectors;
 public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Object> {
 
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = 
"inputChangelogMode";
+    public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = 
"requireUpsertMaterialize";
 
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
     @JsonSerialize(using = ChangelogModeJsonSerializer.class)
     @JsonDeserialize(using = ChangelogModeJsonDeserializer.class)
     private final ChangelogMode inputChangelogMode;
 
+    @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final boolean upsertMaterialize;
+
     public StreamExecSink(
             DynamicTableSinkSpec tableSinkSpec,
             ChangelogMode inputChangelogMode,
             InputProperty inputProperty,
             LogicalType outputType,
+            boolean upsertMaterialize,
             String description) {
         super(
                 tableSinkSpec,
@@ -75,6 +82,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
                 outputType,
                 description);
         this.inputChangelogMode = inputChangelogMode;
+        this.upsertMaterialize = upsertMaterialize;
     }
 
     @JsonCreator
@@ -84,6 +92,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
             @JsonProperty(FIELD_NAME_ID) int id,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType,
+            @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean 
upsertMaterialize,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(
                 tableSinkSpec,
@@ -94,6 +103,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 outputType,
                 description);
         this.inputChangelogMode = inputChangelogMode;
+        this.upsertMaterialize = upsertMaterialize;
     }
 
     @SuppressWarnings("unchecked")
@@ -128,6 +138,10 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         }
 
         return createSinkTransformation(
-                planner.getExecEnv(), planner.getTableConfig(), 
inputTransform, rowtimeFieldIndex);
+                planner.getExecEnv(),
+                planner.getTableConfig(),
+                inputTransform,
+                rowtimeFieldIndex,
+                upsertMaterialize);
     }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index fda14e9..58d540e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,22 +18,31 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
-import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, 
FlinkRelOptUtil}
+import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, 
FlinkRelOptUtil, RelDescriptionWriterImpl}
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.types.RowKind
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.util.ImmutableBitSet
 
+import java.io.{PrintWriter, StringWriter}
 import java.util
 
+import scala.collection.JavaConversions._
+
 /**
  * Stream physical RelNode to to write data into an external sink defined by a
  * [[DynamicTableSink]].
@@ -75,12 +84,59 @@ class StreamPhysicalSink(
     val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
     tableSinkSpec.setReadableConfig(tableConfig.getConfiguration)
 
+    val primaryKeys = toScala(catalogTable.getResolvedSchema
+        .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+
+    val upsertMaterialize = tableConfig.getConfiguration.get(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
+      case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
+      case UpsertMaterialize.NONE => false
+      case UpsertMaterialize.AUTO =>
+        val insertOnly = tableSink
+            .getChangelogMode(inputChangelogMode)
+            .containsOnly(RowKind.INSERT)
+
+        if (!insertOnly && primaryKeys.nonEmpty) {
+          val columnNames = catalogTable.getResolvedSchema.getColumnNames
+          val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): 
_*)
+
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
+          val uniqueKeys = fmq.getUniqueKeys(getInput)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
+
+          if (uniqueKeys != null &&
+              uniqueKeys.exists(pks.contains) &&
+              !(changeLogUpsertKeys != null &&
+                  changeLogUpsertKeys.exists(pks.contains))) {
+            true
+          } else {
+            false
+          }
+        } else {
+          false
+        }
+    }
+
     new StreamExecSink(
       tableSinkSpec,
       inputChangelogMode,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
-      getRelDetailedDescription
+      upsertMaterialize,
+      getDescriptionWithUpsert(upsertMaterialize)
     )
   }
+
+  /**
+   * The inputChangelogMode can only be obtained in translateToExecNode phase.
+   */
+  def getDescriptionWithUpsert(upsertMaterialize: Boolean): String = {
+    val sw = new StringWriter
+    val pw = new PrintWriter(sw)
+    val relWriter = new RelDescriptionWriterImpl(pw)
+    this.explainTerms(relWriter)
+    relWriter.itemIf("upsertMaterialize", "true", upsertMaterialize)
+    relWriter.done(this)
+    sw.toString
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index b0ec026..28c1048 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -105,11 +105,6 @@ Sink(table=[default_catalog.default_database.upsertSink], 
fields=[a, total_min],
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testInsertMismatchTypeForEmptyChar">
-    <Resource name="sql">
-      <![CDATA[INSERT INTO my_sink SELECT a, '', '' FROM MyTable]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testMetadataColumn">
     <Resource name="ast">
       <![CDATA[
@@ -240,6 +235,41 @@ Sink(table=[default_catalog.default_database.retractSink], 
fields=[cnt, a], chan
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSinkDisorderChangeLogWithRank">
+    <Resource name="sql">
+      <![CDATA[
+INSERT INTO SinkRankChangeLog
+SELECT person, sum_votes FROM
+ (SELECT person, sum_votes,
+   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS 
rank_number
+   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS 
vote_section FROM src
+      GROUP BY person))
+   WHERE rank_number < 10
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.SinkRankChangeLog], 
fields=[person, sum_votes])
++- LogicalProject(person=[$0], sum_votes=[$1])
+   +- LogicalFilter(condition=[<($2, 10)])
+      +- LogicalProject(person=[$0], sum_votes=[$1], rank_number=[ROW_NUMBER() 
OVER (PARTITION BY /($1, 2) ORDER BY $1 DESC NULLS LAST)])
+         +- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.SinkRankChangeLog], 
fields=[person, sum_votes], upsertMaterialize=[true])
++- Calc(select=[person, sum_votes])
+   +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[$2], orderBy=[sum_votes DESC], 
select=[person, sum_votes, $2])
+      +- Exchange(distribution=[hash[$2]])
+         +- Calc(select=[person, sum_votes, (sum_votes / 2) AS $2])
+            +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) AS 
sum_votes])
+               +- Exchange(distribution=[hash[person]])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, src]], fields=[person, votes])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testUpsertSinkWithFilter">
     <Resource name="ast">
       <![CDATA[
@@ -262,4 +292,38 @@ Sink(table=[default_catalog.default_database.upsertSink], 
fields=[a, cnt], chang
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSinkDisorderChangeLogWithJoin">
+    <Resource name="sql">
+      <![CDATA[
+INSERT INTO SinkJoinChangeLog
+SELECT T.person, T.sum_votes, award.prize FROM
+   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award
+   WHERE T.sum_votes = award.votes
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.SinkJoinChangeLog], 
fields=[person, sum_votes, prize])
++- LogicalProject(person=[$0], sum_votes=[$1], prize=[$3])
+   +- LogicalFilter(condition=[=($1, $2)])
+      +- LogicalJoin(condition=[true], joinType=[inner])
+         :- LogicalAggregate(group=[{0}], sum_votes=[SUM($1)])
+         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
src]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
award]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.SinkJoinChangeLog], 
fields=[person, sum_votes, prize], upsertMaterialize=[true])
++- Calc(select=[person, sum_votes, prize])
+   +- Join(joinType=[InnerJoin], where=[(sum_votes = votes)], select=[person, 
sum_votes, votes, prize], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
+      :- Exchange(distribution=[hash[sum_votes]])
+      :  +- GroupAggregate(groupBy=[person], select=[person, SUM(votes) AS 
sum_votes])
+      :     +- Exchange(distribution=[hash[person]])
+      :        +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[person, votes])
+      +- Exchange(distribution=[hash[votes]])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
award]], fields=[votes, prize])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index 3c02bed..461c4fc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -29,6 +29,27 @@ class TableSinkTest extends TableTestBase {
   private val util = streamTestUtil()
   util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
+  util.tableEnv.executeSql(
+    """
+      |CREATE TABLE src (person String, votes BIGINT) WITH(
+      |  'connector' = 'values'
+      |)
+      |""".stripMargin)
+
+  util.tableEnv.executeSql(
+    """
+      |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT 
ENFORCED) WITH(
+      |  'connector' = 'values'
+      |)
+      |""".stripMargin)
+
+  util.tableEnv.executeSql(
+    """
+      |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT 
ENFORCED) WITH(
+      |  'connector' = 'values'
+      |)
+      |""".stripMargin)
+
   @Test
   def testInsertMismatchTypeForEmptyChar(): Unit = {
     util.addTable(
@@ -398,4 +419,49 @@ class TableSinkTest extends TableTestBase {
 
     util.verifyRelPlan(stmtSet)
   }
+
+  @Test
+  def testSinkDisorderChangeLogWithJoin(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE SinkJoinChangeLog (
+        |  person STRING, votes BIGINT, prize DOUBLE,
+        |  PRIMARY KEY(person) NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    util.verifyExecPlanInsert(
+      """
+        |INSERT INTO SinkJoinChangeLog
+        |SELECT T.person, T.sum_votes, award.prize FROM
+        |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) 
T, award
+        |   WHERE T.sum_votes = award.votes
+        |""".stripMargin)
+  }
+
+  @Test
+  def testSinkDisorderChangeLogWithRank(): Unit = {
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE SinkRankChangeLog (
+        |  person STRING, votes BIGINT,
+        |  PRIMARY KEY(person) NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    util.verifyExecPlanInsert(
+      """
+        |INSERT INTO SinkRankChangeLog
+        |SELECT person, sum_votes FROM
+        | (SELECT person, sum_votes,
+        |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes 
DESC) AS rank_number
+        |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS 
vote_section FROM src
+        |      GROUP BY person))
+        |   WHERE rank_number < 10
+        |""".stripMargin)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
new file mode 100644
index 0000000..20b61a3
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.planner.runtime.utils._
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+@RunWith(classOf[Parameterized])
+class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
+
+  override def before(): Unit = {
+    super.before()
+
+    val srcDataId = TestValuesTableFactory.registerData(Seq(
+      row("jason", 1L),
+      row("jason", 1L),
+      row("jason", 1L),
+      row("jason", 1L)
+    ))
+    tEnv.executeSql(
+      s"""
+        |CREATE TABLE src (person String, votes BIGINT) WITH(
+        |  'connector' = 'values',
+        |  'data-id' = '$srcDataId'
+        |)
+        |""".stripMargin)
+
+    val awardDataId = TestValuesTableFactory.registerData(Seq(
+      row(1L, 5.2D),
+      row(2L, 12.1D),
+      row(3L, 18.3D),
+      row(4L, 22.5D)
+    ))
+    tEnv.executeSql(
+      s"""
+        |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) 
NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'data-id' = '$awardDataId'
+        |)
+        |""".stripMargin)
+
+    val peopleDataId = TestValuesTableFactory.registerData(Seq(row("jason", 
22)))
+    tEnv.executeSql(
+      s"""
+        |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT 
ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'data-id' = '$peopleDataId'
+        |)
+        |""".stripMargin)
+  }
+
+  @Test
+  def testJoinDisorderChangeLog(): Unit = {
+    tEnv.executeSql(
+      """
+        |CREATE TABLE JoinDisorderChangeLog (
+        |  person STRING, votes BIGINT, prize DOUBLE, age INT,
+        |  PRIMARY KEY(person) NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        |INSERT INTO JoinDisorderChangeLog
+        |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM
+        | (SELECT T.person, T.sum_votes, award.prize FROM
+        |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) 
T,
+        |   award
+        |   WHERE T.sum_votes = award.votes) T1, people T2
+        | WHERE T1.person = T2.person
+        |""".stripMargin).await()
+
+    val result = TestValuesTableFactory.getResults("JoinDisorderChangeLog")
+    val expected = List("+I[jason, 4, 22.5, 22]")
+    assertEquals(expected.sorted, result.sorted)
+  }
+
+  @Test
+  def testSinkDisorderChangeLog(): Unit = {
+    tEnv.executeSql(
+      """
+        |CREATE TABLE SinkDisorderChangeLog (
+        |  person STRING, votes BIGINT, prize DOUBLE,
+        |  PRIMARY KEY(person) NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        |INSERT INTO SinkDisorderChangeLog
+        |SELECT T.person, T.sum_votes, award.prize FROM
+        |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) 
T, award
+        |   WHERE T.sum_votes = award.votes
+        |""".stripMargin).await()
+
+    val result = TestValuesTableFactory.getResults("SinkDisorderChangeLog")
+    val expected = List("+I[jason, 4, 22.5]")
+    assertEquals(expected.sorted, result.sorted)
+  }
+
+  @Test
+  def testSinkDisorderChangeLogWithRank(): Unit = {
+    tEnv.executeSql(
+      """
+        |CREATE TABLE SinkRankChangeLog (
+        |  person STRING, votes BIGINT,
+        |  PRIMARY KEY(person) NOT ENFORCED) WITH(
+        |  'connector' = 'values',
+        |  'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        |INSERT INTO SinkRankChangeLog
+        |SELECT person, sum_votes FROM
+        | (SELECT person, sum_votes,
+        |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes 
DESC) AS rank_number
+        |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS 
vote_section FROM src
+        |      GROUP BY person))
+        |   WHERE rank_number < 10
+        |""".stripMargin).await()
+
+    val result = TestValuesTableFactory.getResults("SinkRankChangeLog")
+    val expected = List("+I[jason, 4]")
+    assertEquals(expected.sorted, result.sorted)
+  }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
new file mode 100644
index 0000000..2c06e67
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A operator that maintains the records corresponding to the upsert keys in 
the state, it receives
+ * the upstream changelog records and generate an upsert view for the 
downstream.
+ *
+ * <ul>
+ *   <li>For insert record, append the state and collect current record.
+ *   <li>For delete record, delete in the state, collect delete record when 
the state is empty.
+ *   <li>For delete record, delete in the state, collect the last one when the 
state is not empty.
+ * </ul>
+ */
+public class SinkUpsertMaterializer extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SinkUpsertMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state ttl. This will result in 
incorrect result. "
+                    + "You can increase the state ttl to avoid this.";
+
+    private final StateTtlConfig ttlConfig;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedEqualiser;
+
+    private transient RecordEqualiser equaliser;
+    private transient ValueState<List<RowData>> state;
+    private transient TimestampedCollector<RowData> collector;
+
+    public SinkUpsertMaterializer(
+            StateTtlConfig ttlConfig,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedEqualiser) {
+        this.ttlConfig = ttlConfig;
+        this.serializer = serializer;
+        this.generatedEqualiser = generatedEqualiser;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.equaliser =
+                
generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+        ValueStateDescriptor<List<RowData>> descriptor =
+                new ValueStateDescriptor<>("values", new 
ListSerializer<>(serializer));
+        if (ttlConfig.isEnabled()) {
+            descriptor.enableTimeToLive(ttlConfig);
+        }
+        this.state = getRuntimeContext().getState(descriptor);
+        this.collector = new TimestampedCollector<>(output);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        boolean isInsertOp = row.getRowKind() == INSERT || row.getRowKind() == 
UPDATE_AFTER;
+        // Always set the RowKind to INSERT, so that we can compare rows 
correctly (RowKind will
+        // be ignored)
+        row.setRowKind(INSERT);
+        List<RowData> values = state.value();
+        if (values == null) {
+            values = new ArrayList<>(2);
+        }
+
+        if (isInsertOp) {
+            values.add(row);
+            // Update to this new one
+            collector.collect(row);
+        } else {
+            int lastIndex = values.size() - 1;
+            int index = removeFirst(values, row);
+            if (index == -1) {
+                LOG.info(STATE_CLEARED_WARN_MSG);
+                return;
+            }
+            if (values.isEmpty()) {
+                // Delete this row
+                row.setRowKind(DELETE);
+                collector.collect(row);
+            } else if (index == lastIndex) {
+                // Last one removed
+                // Update to newer
+                collector.collect(values.get(values.size() - 1));
+            }
+        }
+
+        if (values.isEmpty()) {
+            state.clear();
+        } else {
+            state.update(values);
+        }
+    }
+
+    private int removeFirst(List<RowData> values, RowData remove) {
+        Iterator<RowData> iterator = values.iterator();
+        int i = 0;
+        while (iterator.hasNext()) {
+            RowData row = iterator.next();
+            if (equaliser.equals(row, remove)) {
+                iterator.remove();
+                return i;
+            }
+            i++;
+        }
+        return -1;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
new file mode 100644
index 0000000..3b246be
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
+
+/** Test for {@link SinkUpsertMaterializer}. */
+public class SinkUpsertMaterializerTest {
+
+    private final StateTtlConfig ttlConfig = 
StateConfigUtil.createTtlConfig(1000);
+    private final LogicalType[] types = new LogicalType[] {new IntType(), new 
VarCharType()};
+    private final RowDataSerializer serializer = new RowDataSerializer(types);
+    private final RowDataKeySelector keySelector =
+            HandwrittenSelectorUtil.getRowDataSelector(new int[0], types);
+    private final GeneratedRecordEqualiser equaliser =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    @Test
+    public void test() throws Exception {
+        SinkUpsertMaterializer materializer =
+                new SinkUpsertMaterializer(ttlConfig, serializer, equaliser);
+        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        materializer, keySelector, 
keySelector.getProducedType());
+
+        testHarness.open();
+
+        testHarness.setStateTtlProcessingTime(1);
+
+        testHarness.processElement(insertRecord(1, "a1"));
+        Assert.assertEquals(Collections.singletonList(row(1, "a1")), 
toRows(testHarness));
+
+        testHarness.processElement(insertRecord(1, "a2"));
+        Assert.assertEquals(Collections.singletonList(row(1, "a2")), 
toRows(testHarness));
+
+        testHarness.processElement(insertRecord(1, "a3"));
+        Assert.assertEquals(Collections.singletonList(row(1, "a3")), 
toRows(testHarness));
+
+        testHarness.processElement(deleteRecord(1, "a2"));
+        Assert.assertEquals(Collections.emptyList(), toRows(testHarness));
+
+        testHarness.processElement(deleteRecord(1, "a3"));
+        Assert.assertEquals(Collections.singletonList(row(1, "a1")), 
toRows(testHarness));
+
+        testHarness.processElement(deleteRecord(1, "a1"));
+        RowData deleteRow = row(1, "a1");
+        deleteRow.setRowKind(RowKind.DELETE);
+        Assert.assertEquals(Collections.singletonList(deleteRow), 
toRows(testHarness));
+
+        testHarness.processElement(insertRecord(1, "a4"));
+        Assert.assertEquals(Collections.singletonList(row(1, "a4")), 
toRows(testHarness));
+
+        testHarness.setStateTtlProcessingTime(1002);
+
+        testHarness.processElement(deleteRecord(1, "a4"));
+        Assert.assertEquals(Collections.emptyList(), toRows(testHarness));
+
+        testHarness.close();
+    }
+
+    private List<RowData> toRows(OneInputStreamOperatorTestHarness<RowData, 
RowData> harness) {
+        Object o;
+        List<RowData> ret = new ArrayList<>();
+        while ((o = harness.getOutput().poll()) != null) {
+            RowData value = (RowData) ((StreamRecord) o).getValue();
+            GenericRowData newRow = GenericRowData.of(value.getInt(0), 
value.getString(1));
+            newRow.setRowKind(value.getRowKind());
+            ret.add(newRow);
+        }
+        return ret;
+    }
+
+    private static class TestRecordEqualiser implements RecordEqualiser {
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getInt(0) == row2.getInt(0) && 
row1.getString(1).equals(row2.getString(1));
+        }
+    }
+}

Reply via email to