This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8157f  [FLINK-23531][table] Supports compact-changes in row-time 
deduplicate mini-batch
4d8157f is described below

commit 4d8157f6d56e2ce8b76ed99ff8afaf2c2fff4fa6
Author: zhangmang <[email protected]>
AuthorDate: Wed Aug 4 16:26:48 2021 +0800

    [FLINK-23531][table] Supports compact-changes in row-time deduplicate 
mini-batch
    
    This closes #16630
---
 .../nodes/exec/stream/StreamExecDeduplicate.java   |  50 +++-
 .../runtime/stream/sql/DeduplicateITCase.scala     |  76 ++++-
 ...meMiniBatchLatestChangeDeduplicateFunction.java |  89 ++++++
 .../RowTimeDeduplicateFunctionTest.java            |  42 +--
 .../RowTimeDeduplicateFunctionTestBase.java        |  68 +++++
 ...niBatchLatestChangeDeduplicateFunctionTest.java | 328 +++++++++++++++++++++
 6 files changed, 600 insertions(+), 53 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 715adc1..309fce0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
 import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
 import 
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
 import 
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
@@ -47,6 +48,7 @@ import 
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDed
 import 
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
 import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
 import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
+import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.RowType;
@@ -89,6 +91,17 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
                                     + "but there will be additional overhead."
                                     + "Default is true.");
 
+    @Experimental
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES =
+            
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Set whether to compact the changes sent 
downstream in row-time mini-batch. "
+                                    + "If true, Flink will compact changes, 
only send the latest change to downstream. "
+                                    + "Notes: If the downstream needs the 
details of versioned data, this optimization cannot be opened. "
+                                    + "If false, Flink will send all changes 
to downstream just like when the mini-batch is not on.");
+
     @JsonProperty(FIELD_NAME_UNIQUE_KEYS)
     private final int[] uniqueKeys;
 
@@ -223,6 +236,12 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
                     
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
         }
 
+        protected boolean isCompactChanges() {
+            return tableConfig
+                    .getConfiguration()
+                    
.getBoolean(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES);
+        }
+
         protected long getMinRetentionTime() {
             return tableConfig.getMinIdleStateRetentionTime();
         }
@@ -275,15 +294,28 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
             checkArgument(rowtimeIndex >= 0);
             if (isMiniBatchEnabled()) {
                 CountBundleTrigger<RowData> trigger = new 
CountBundleTrigger<>(getMiniBatchSize());
-                RowTimeMiniBatchDeduplicateFunction processFunction =
-                        new RowTimeMiniBatchDeduplicateFunction(
-                                rowTypeInfo,
-                                typeSerializer,
-                                getMinRetentionTime(),
-                                rowtimeIndex,
-                                generateUpdateBefore,
-                                generateInsert(),
-                                keepLastRow);
+                MapBundleFunction processFunction;
+                if (isCompactChanges()) {
+                    processFunction =
+                            new 
RowTimeMiniBatchLatestChangeDeduplicateFunction(
+                                    rowTypeInfo,
+                                    typeSerializer,
+                                    getMinRetentionTime(),
+                                    rowtimeIndex,
+                                    generateUpdateBefore,
+                                    generateInsert(),
+                                    keepLastRow);
+                } else {
+                    processFunction =
+                            new RowTimeMiniBatchDeduplicateFunction(
+                                    rowTypeInfo,
+                                    typeSerializer,
+                                    getMinRetentionTime(),
+                                    rowtimeIndex,
+                                    generateUpdateBefore,
+                                    generateInsert(),
+                                    keepLastRow);
+                }
                 return new KeyedMapBundleOperator<>(processFunction, trigger);
             } else {
                 RowTimeDeduplicateFunction processFunction =
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index efe1cfc..7b0171d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -23,14 +23,14 @@ import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode,
 MiniBatchOn}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.utils.LegacyRowResource
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
-import org.junit.{Rule, Test}
+import org.junit.{Assume, Rule, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -190,6 +190,40 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: 
StateBackendMode)
   }
 
   @Test
+  def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
+    Assume.assumeTrue("Without all change log only for minibatch.", miniBatch 
== MiniBatchOn)
+    tEnv.getConfig.getConfiguration.setBoolean(
+      StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, 
true)
+    val t = env.fromCollection(rowtimeTestData)
+      .assignTimestampsAndWatermarks(new RowtimeExtractor)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+    tEnv.registerTable("T", t)
+    createSinkTable("rowtime_sink")
+
+    val sql =
+      """
+        |INSERT INTO rowtime_sink
+        | SELECT a, b, c, rowtime
+        | FROM (
+        |   SELECT *,
+        |     ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
+        |   FROM T
+        | )
+        | WHERE rowNum = 1
+      """.stripMargin
+
+    tEnv.executeSql(sql).await()
+    val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+    val expected = List(
+      "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+      "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+      "+I(3,4,Comment#2,1970-01-01T00:00:00.004)",
+      "+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
   def testFirstRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
@@ -263,6 +297,42 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: 
StateBackendMode)
   }
 
   @Test
+  def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
+    Assume.assumeTrue("Without all change log only for minibatch.", miniBatch 
== MiniBatchOn)
+    tEnv.getConfig.getConfiguration.setBoolean(
+      StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES, 
true)
+    val t = env.fromCollection(rowtimeTestData)
+      .assignTimestampsAndWatermarks(new RowtimeExtractor)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+    tEnv.registerTable("T", t)
+    createSinkTable("rowtime_sink")
+
+    val sql =
+      """
+        |INSERT INTO rowtime_sink
+        | SELECT a, b, c, rowtime
+        | FROM (
+        |   SELECT *,
+        |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as 
rowNum
+        |   FROM T
+        | )
+        | WHERE rowNum = 1
+      """.stripMargin
+
+    tEnv.executeSql(sql).await()
+    val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+    val expected = List(
+      "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+      "+I(1,2,Hello world,1970-01-01T00:00:00.002)",
+      "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+      "+I(2,6,Comment#1,1970-01-01T00:00:00.006)",
+      "+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
+      "+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
+    assertEquals(expected.sorted, rawResult.sorted)
+  }
+
+  @Test
   def testLastRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
     val t = env.fromCollection(rowtimeTestData)
       .assignTimestampsAndWatermarks(new RowtimeExtractor)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
new file mode 100644
index 0000000..a46aed4
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
+
+/**
+ * This function is used to get the first or last row for every key partition 
in miniBatch mode. But
+ * only send latest change log to downstream.
+ */
+public class RowTimeMiniBatchLatestChangeDeduplicateFunction
+        extends MiniBatchDeduplicateFunctionBase<RowData, RowData, RowData, 
RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TypeSerializer<RowData> serializer;
+    private final boolean generateUpdateBefore;
+    private final boolean generateInsert;
+    private final int rowtimeIndex;
+    private final boolean keepLastRow;
+
+    public RowTimeMiniBatchLatestChangeDeduplicateFunction(
+            InternalTypeInfo<RowData> typeInfo,
+            TypeSerializer<RowData> serializer,
+            long minRetentionTime,
+            int rowtimeIndex,
+            boolean generateUpdateBefore,
+            boolean generateInsert,
+            boolean keepLastRow) {
+        super(typeInfo, minRetentionTime);
+        this.serializer = serializer;
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.generateInsert = generateInsert;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+    }
+
+    @Override
+    public RowData addInput(@Nullable RowData value, RowData input) throws 
Exception {
+        if (isDuplicate(value, input, rowtimeIndex, keepLastRow)) {
+            return serializer.copy(input);
+        }
+        return value;
+    }
+
+    @Override
+    public void finishBundle(Map<RowData, RowData> buffer, Collector<RowData> 
out)
+            throws Exception {
+        for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
+            RowData currentKey = entry.getKey();
+            RowData bufferedRow = entry.getValue();
+            ctx.setCurrentKey(currentKey);
+            RowData preRow = state.value();
+            checkInsertOnly(bufferedRow);
+            if (isDuplicate(preRow, bufferedRow, rowtimeIndex, keepLastRow)) {
+                updateDeduplicateResult(
+                        generateUpdateBefore, generateInsert, preRow, 
bufferedRow, out);
+                state.update(bufferedRow);
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
index ed800b1..ef7a716 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
@@ -18,24 +18,13 @@
 
 package org.apache.flink.table.runtime.operators.deduplicate;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
 import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
-import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.HandwrittenSelectorUtil;
 import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
@@ -55,24 +44,7 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.record;
  * RowTimeMiniBatchDeduplicateFunction}.
  */
 @RunWith(Parameterized.class)
-public class RowTimeDeduplicateFunctionTest {
-
-    private final long miniBatchSize = 4L;
-    private Time minTtlTime = Time.milliseconds(10);
-    private InternalTypeInfo inputRowType =
-            InternalTypeInfo.ofFields(
-                    new VarCharType(VarCharType.MAX_LENGTH), new IntType(), 
new BigIntType());
-    private TypeSerializer<RowData> serializer = inputRowType.toSerializer();
-    private int rowTimeIndex = 2;
-    private int rowKeyIndex = 0;
-    private RowDataKeySelector rowKeySelector =
-            HandwrittenSelectorUtil.getRowDataSelector(
-                    new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
-    private RowDataHarnessAssertor assertor =
-            new RowDataHarnessAssertor(
-                    inputRowType.toRowFieldTypes(),
-                    new GenericRowRecordSortComparator(
-                            rowKeyIndex, 
inputRowType.toRowFieldTypes()[rowKeyIndex]));
+public class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase {
 
     private final boolean miniBatchEnable;
 
@@ -328,18 +300,6 @@ public class RowTimeDeduplicateFunctionTest {
         testHarness.close();
     }
 
-    private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-            KeyedProcessOperator<RowData, RowData, RowData> operator) throws 
Exception {
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator, rowKeySelector, rowKeySelector.getProducedType());
-    }
-
-    private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-            KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
operator) throws Exception {
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator, rowKeySelector, rowKeySelector.getProducedType());
-    }
-
     @Parameterized.Parameters(name = "miniBatchEnable = {0}")
     public static Collection<Boolean[]> runMode() {
         return Arrays.asList(new Boolean[] {false}, new Boolean[] {true});
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
new file mode 100644
index 0000000..2a6cc8f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+/** Base class of tests for all kinds of row-time DeduplicateFunction. */
+abstract class RowTimeDeduplicateFunctionTestBase {
+
+    protected final long miniBatchSize = 4L;
+    protected Time minTtlTime = Time.milliseconds(10);
+    protected InternalTypeInfo inputRowType =
+            InternalTypeInfo.ofFields(
+                    new VarCharType(VarCharType.MAX_LENGTH), new IntType(), 
new BigIntType());
+    protected TypeSerializer<RowData> serializer = inputRowType.toSerializer();
+    protected int rowTimeIndex = 2;
+    protected int rowKeyIndex = 0;
+    protected RowDataKeySelector rowKeySelector =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
+    protected RowDataHarnessAssertor assertor =
+            new RowDataHarnessAssertor(
+                    inputRowType.toRowFieldTypes(),
+                    new GenericRowRecordSortComparator(
+                            rowKeyIndex, 
inputRowType.toRowFieldTypes()[rowKeyIndex]));
+
+    protected OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
+            KeyedProcessOperator<RowData, RowData, RowData> operator) throws 
Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, rowKeySelector, rowKeySelector.getProducedType());
+    }
+
+    protected OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
+            KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
operator) throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, rowKeySelector, rowKeySelector.getProducedType());
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
new file mode 100644
index 0000000..233ee39
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Harness tests for {@link RowTimeMiniBatchLatestChangeDeduplicateFunction}. 
*/
+public class RowTimeMiniBatchLatestChangeDeduplicateFunctionTest
+        extends RowTimeDeduplicateFunctionTestBase {
+
+    @Test
+    public void 
testKeepLastRowWithoutGenerateUpdateBeforeAndWithGenerateInsert() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(false, true, true);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 11, 2L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(updateAfterRecord("book", 14, 3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepLastRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(false, false, true);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(updateAfterRecord("book", 11, 2L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(updateAfterRecord("book", 14, 3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsert() 
throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, true, true);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 11, 2L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(updateBeforeRecord("book", 11, 2L));
+        expectedOutput.add(updateAfterRecord("book", 14, 3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepLastRowWithGenerateUpdateBeforeAndWithoutGenerateInsert() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, false, true);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 11, 2L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(updateBeforeRecord("book", 11, 2L));
+        expectedOutput.add(updateAfterRecord("book", 14, 3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, true, true);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 11, 2L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        // clear state.
+        testHarness.setStateTtlProcessingTime(10);
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(insertRecord("book", 14, 3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepFirstRowWithoutGenerateUpdateBeforeAndWithGenerateInsert()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(false, true, false);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 10, 1L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepFirstRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(false, false, false);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(updateAfterRecord("book", 10, 1L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsert() throws 
Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, true, false);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 10, 1L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepFirstRowWithGenerateUpdateBeforeAndWithoutGenerateInsert()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, false, false);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 10, 1L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    @Test
+    public void 
testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl()
+            throws Exception {
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(true, true, false);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 10, 1L));
+        testHarness.processElement(insertRecord("book", 11, 2L));
+        testHarness.processElement(insertRecord("book", 13, 1L));
+        // output is empty because bundle not trigger yet.
+        Assert.assertTrue(testHarness.getOutput().isEmpty());
+        // bundle trigger emit.
+        testHarness.processElement(insertRecord("book", 12, 1L));
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 10, 1L));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+
+        // clear state.
+        testHarness.setStateTtlProcessingTime(10);
+
+        testHarness.processElement(insertRecord("book", 14, 3L));
+        testHarness.processElement(insertRecord("book", 15, 1L));
+        // watermark trigger emit.
+        testHarness.processWatermark(new Watermark(3L));
+        expectedOutput.add(insertRecord("book", 15, 1L));
+        expectedOutput.add(new Watermark(3L));
+        testHarness.close();
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
testHarness.getOutput());
+    }
+
+    private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
+            boolean generateUpdateBefore, boolean generateInsert, boolean 
keepLastRow)
+            throws Exception {
+        RowTimeMiniBatchLatestChangeDeduplicateFunction func =
+                new RowTimeMiniBatchLatestChangeDeduplicateFunction(
+                        inputRowType,
+                        serializer,
+                        minTtlTime.toMilliseconds(),
+                        rowTimeIndex,
+                        generateUpdateBefore,
+                        generateInsert,
+                        keepLastRow);
+        CountBundleTrigger trigger = new 
CountBundleTrigger<RowData>(miniBatchSize);
+        KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
keyedMapBundleOperator =
+                new KeyedMapBundleOperator(func, trigger);
+        return createTestHarness(keyedMapBundleOperator);
+    }
+}

Reply via email to