This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8157f [FLINK-23531][table] Supports compact-changes in row-time
deduplicate mini-batch
4d8157f is described below
commit 4d8157f6d56e2ce8b76ed99ff8afaf2c2fff4fa6
Author: zhangmang <[email protected]>
AuthorDate: Wed Aug 4 16:26:48 2021 +0800
[FLINK-23531][table] Supports compact-changes in row-time deduplicate
mini-batch
This closes #16630
---
.../nodes/exec/stream/StreamExecDeduplicate.java | 50 +++-
.../runtime/stream/sql/DeduplicateITCase.scala | 76 ++++-
...meMiniBatchLatestChangeDeduplicateFunction.java | 89 ++++++
.../RowTimeDeduplicateFunctionTest.java | 42 +--
.../RowTimeDeduplicateFunctionTestBase.java | 68 +++++
...niBatchLatestChangeDeduplicateFunctionTest.java | 328 +++++++++++++++++++++
6 files changed, 600 insertions(+), 53 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 715adc1..309fce0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -40,6 +40,7 @@ import
org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
import
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
@@ -47,6 +48,7 @@ import
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDed
import
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import
org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
+import
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -89,6 +91,17 @@ public class StreamExecDeduplicate extends
ExecNodeBase<RowData>
+ "but there will be additional overhead."
+ "Default is true.");
+ @Experimental
+ public static final ConfigOption<Boolean>
TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES =
+
ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Set whether to compact the changes sent
downstream in row-time mini-batch. "
+ + "If true, Flink will compact changes,
only send the latest change to downstream. "
+ + "Notes: If the downstream needs the
details of versioned data, this optimization cannot be opened. "
+ + "If false, Flink will send all changes
to downstream just like when the mini-batch is not on.");
+
@JsonProperty(FIELD_NAME_UNIQUE_KEYS)
private final int[] uniqueKeys;
@@ -223,6 +236,12 @@ public class StreamExecDeduplicate extends
ExecNodeBase<RowData>
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
}
+ protected boolean isCompactChanges() {
+ return tableConfig
+ .getConfiguration()
+
.getBoolean(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES);
+ }
+
protected long getMinRetentionTime() {
return tableConfig.getMinIdleStateRetentionTime();
}
@@ -275,15 +294,28 @@ public class StreamExecDeduplicate extends
ExecNodeBase<RowData>
checkArgument(rowtimeIndex >= 0);
if (isMiniBatchEnabled()) {
CountBundleTrigger<RowData> trigger = new
CountBundleTrigger<>(getMiniBatchSize());
- RowTimeMiniBatchDeduplicateFunction processFunction =
- new RowTimeMiniBatchDeduplicateFunction(
- rowTypeInfo,
- typeSerializer,
- getMinRetentionTime(),
- rowtimeIndex,
- generateUpdateBefore,
- generateInsert(),
- keepLastRow);
+ MapBundleFunction processFunction;
+ if (isCompactChanges()) {
+ processFunction =
+ new
RowTimeMiniBatchLatestChangeDeduplicateFunction(
+ rowTypeInfo,
+ typeSerializer,
+ getMinRetentionTime(),
+ rowtimeIndex,
+ generateUpdateBefore,
+ generateInsert(),
+ keepLastRow);
+ } else {
+ processFunction =
+ new RowTimeMiniBatchDeduplicateFunction(
+ rowTypeInfo,
+ typeSerializer,
+ getMinRetentionTime(),
+ rowtimeIndex,
+ generateUpdateBefore,
+ generateInsert(),
+ keepLastRow);
+ }
return new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
RowTimeDeduplicateFunction processFunction =
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index efe1cfc..7b0171d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -23,14 +23,14 @@ import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
-import
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate
+import
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode,
MiniBatchOn}
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.types.Row
-
import org.junit.Assert._
-import org.junit.{Rule, Test}
+import org.junit.{Assume, Rule, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -190,6 +190,40 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode:
StateBackendMode)
}
@Test
+ def testFirstRowWithoutAllChangelogOnRowtime(): Unit = {
+ Assume.assumeTrue("Without all change log only for minibatch.", miniBatch
== MiniBatchOn)
+ tEnv.getConfig.getConfiguration.setBoolean(
+ StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES,
true)
+ val t = env.fromCollection(rowtimeTestData)
+ .assignTimestampsAndWatermarks(new RowtimeExtractor)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+ tEnv.registerTable("T", t)
+ createSinkTable("rowtime_sink")
+
+ val sql =
+ """
+ |INSERT INTO rowtime_sink
+ | SELECT a, b, c, rowtime
+ | FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
+ | FROM T
+ | )
+ | WHERE rowNum = 1
+ """.stripMargin
+
+ tEnv.executeSql(sql).await()
+ val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+ val expected = List(
+ "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+ "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+ "+I(3,4,Comment#2,1970-01-01T00:00:00.004)",
+ "+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
def testFirstRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
@@ -263,6 +297,42 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode:
StateBackendMode)
}
@Test
+ def testLastRowWithoutAllChangelogOnRowtime(): Unit = {
+ Assume.assumeTrue("Without all change log only for minibatch.", miniBatch
== MiniBatchOn)
+ tEnv.getConfig.getConfiguration.setBoolean(
+ StreamExecDeduplicate.TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES,
true)
+ val t = env.fromCollection(rowtimeTestData)
+ .assignTimestampsAndWatermarks(new RowtimeExtractor)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+ tEnv.registerTable("T", t)
+ createSinkTable("rowtime_sink")
+
+ val sql =
+ """
+ |INSERT INTO rowtime_sink
+ | SELECT a, b, c, rowtime
+ | FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as
rowNum
+ | FROM T
+ | )
+ | WHERE rowNum = 1
+ """.stripMargin
+
+ tEnv.executeSql(sql).await()
+ val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+ val expected = List(
+ "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+ "+I(1,2,Hello world,1970-01-01T00:00:00.002)",
+ "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+ "+I(2,6,Comment#1,1970-01-01T00:00:00.006)",
+ "+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
+ "+I(4,4,Comment#3,1970-01-01T00:00:00.004)")
+ assertEquals(expected.sorted, rawResult.sorted)
+ }
+
+ @Test
def testLastRowOnRowTimeFollowedByUnboundedAgg(): Unit = {
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
new file mode 100644
index 0000000..a46aed4
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
+import static
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+import static
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
+
+/**
+ * This function is used to get the first or last row for every key partition
in miniBatch mode. But
+ * only send latest change log to downstream.
+ */
+public class RowTimeMiniBatchLatestChangeDeduplicateFunction
+ extends MiniBatchDeduplicateFunctionBase<RowData, RowData, RowData,
RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<RowData> serializer;
+ private final boolean generateUpdateBefore;
+ private final boolean generateInsert;
+ private final int rowtimeIndex;
+ private final boolean keepLastRow;
+
+ public RowTimeMiniBatchLatestChangeDeduplicateFunction(
+ InternalTypeInfo<RowData> typeInfo,
+ TypeSerializer<RowData> serializer,
+ long minRetentionTime,
+ int rowtimeIndex,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ boolean keepLastRow) {
+ super(typeInfo, minRetentionTime);
+ this.serializer = serializer;
+ this.generateUpdateBefore = generateUpdateBefore;
+ this.generateInsert = generateInsert;
+ this.rowtimeIndex = rowtimeIndex;
+ this.keepLastRow = keepLastRow;
+ }
+
+ @Override
+ public RowData addInput(@Nullable RowData value, RowData input) throws
Exception {
+ if (isDuplicate(value, input, rowtimeIndex, keepLastRow)) {
+ return serializer.copy(input);
+ }
+ return value;
+ }
+
+ @Override
+ public void finishBundle(Map<RowData, RowData> buffer, Collector<RowData>
out)
+ throws Exception {
+ for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
+ RowData currentKey = entry.getKey();
+ RowData bufferedRow = entry.getValue();
+ ctx.setCurrentKey(currentKey);
+ RowData preRow = state.value();
+ checkInsertOnly(bufferedRow);
+ if (isDuplicate(preRow, bufferedRow, rowtimeIndex, keepLastRow)) {
+ updateDeduplicateResult(
+ generateUpdateBefore, generateInsert, preRow,
bufferedRow, out);
+ state.update(bufferedRow);
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
index ed800b1..ef7a716 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
@@ -18,24 +18,13 @@
package org.apache.flink.table.runtime.operators.deduplicate;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
-import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.Test;
@@ -55,24 +44,7 @@ import static
org.apache.flink.table.runtime.util.StreamRecordUtils.record;
* RowTimeMiniBatchDeduplicateFunction}.
*/
@RunWith(Parameterized.class)
-public class RowTimeDeduplicateFunctionTest {
-
- private final long miniBatchSize = 4L;
- private Time minTtlTime = Time.milliseconds(10);
- private InternalTypeInfo inputRowType =
- InternalTypeInfo.ofFields(
- new VarCharType(VarCharType.MAX_LENGTH), new IntType(),
new BigIntType());
- private TypeSerializer<RowData> serializer = inputRowType.toSerializer();
- private int rowTimeIndex = 2;
- private int rowKeyIndex = 0;
- private RowDataKeySelector rowKeySelector =
- HandwrittenSelectorUtil.getRowDataSelector(
- new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
- private RowDataHarnessAssertor assertor =
- new RowDataHarnessAssertor(
- inputRowType.toRowFieldTypes(),
- new GenericRowRecordSortComparator(
- rowKeyIndex,
inputRowType.toRowFieldTypes()[rowKeyIndex]));
+public class RowTimeDeduplicateFunctionTest extends
RowTimeDeduplicateFunctionTestBase {
private final boolean miniBatchEnable;
@@ -328,18 +300,6 @@ public class RowTimeDeduplicateFunctionTest {
testHarness.close();
}
- private OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
- KeyedProcessOperator<RowData, RowData, RowData> operator) throws
Exception {
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator, rowKeySelector, rowKeySelector.getProducedType());
- }
-
- private OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
- KeyedMapBundleOperator<RowData, RowData, RowData, RowData>
operator) throws Exception {
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator, rowKeySelector, rowKeySelector.getProducedType());
- }
-
@Parameterized.Parameters(name = "miniBatchEnable = {0}")
public static Collection<Boolean[]> runMode() {
return Arrays.asList(new Boolean[] {false}, new Boolean[] {true});
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
new file mode 100644
index 0000000..2a6cc8f
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+/** Base class of tests for all kinds of row-time DeduplicateFunction. */
+abstract class RowTimeDeduplicateFunctionTestBase {
+
+ protected final long miniBatchSize = 4L;
+ protected Time minTtlTime = Time.milliseconds(10);
+ protected InternalTypeInfo inputRowType =
+ InternalTypeInfo.ofFields(
+ new VarCharType(VarCharType.MAX_LENGTH), new IntType(),
new BigIntType());
+ protected TypeSerializer<RowData> serializer = inputRowType.toSerializer();
+ protected int rowTimeIndex = 2;
+ protected int rowKeyIndex = 0;
+ protected RowDataKeySelector rowKeySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {rowKeyIndex}, inputRowType.toRowFieldTypes());
+ protected RowDataHarnessAssertor assertor =
+ new RowDataHarnessAssertor(
+ inputRowType.toRowFieldTypes(),
+ new GenericRowRecordSortComparator(
+ rowKeyIndex,
inputRowType.toRowFieldTypes()[rowKeyIndex]));
+
+ protected OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ KeyedProcessOperator<RowData, RowData, RowData> operator) throws
Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, rowKeySelector, rowKeySelector.getProducedType());
+ }
+
+ protected OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ KeyedMapBundleOperator<RowData, RowData, RowData, RowData>
operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, rowKeySelector, rowKeySelector.getProducedType());
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
new file mode 100644
index 0000000..233ee39
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Harness tests for {@link RowTimeMiniBatchLatestChangeDeduplicateFunction}.
*/
+public class RowTimeMiniBatchLatestChangeDeduplicateFunctionTest
+ extends RowTimeDeduplicateFunctionTestBase {
+
+ @Test
+ public void
testKeepLastRowWithoutGenerateUpdateBeforeAndWithGenerateInsert() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(false, true, true);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 11, 2L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(updateAfterRecord("book", 14, 3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepLastRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(false, false, true);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(updateAfterRecord("book", 11, 2L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(updateAfterRecord("book", 14, 3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsert()
throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, true, true);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 11, 2L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(updateBeforeRecord("book", 11, 2L));
+ expectedOutput.add(updateAfterRecord("book", 14, 3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepLastRowWithGenerateUpdateBeforeAndWithoutGenerateInsert() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, false, true);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 11, 2L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(updateBeforeRecord("book", 11, 2L));
+ expectedOutput.add(updateAfterRecord("book", 14, 3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, true, true);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 11, 2L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ // clear state.
+ testHarness.setStateTtlProcessingTime(10);
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(insertRecord("book", 14, 3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepFirstRowWithoutGenerateUpdateBeforeAndWithGenerateInsert()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(false, true, false);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 10, 1L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepFirstRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(false, false, false);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(updateAfterRecord("book", 10, 1L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsert() throws
Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, true, false);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 10, 1L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepFirstRowWithGenerateUpdateBeforeAndWithoutGenerateInsert()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, false, false);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 10, 1L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ @Test
+ public void
testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl()
+ throws Exception {
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(true, true, false);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 10, 1L));
+ testHarness.processElement(insertRecord("book", 11, 2L));
+ testHarness.processElement(insertRecord("book", 13, 1L));
+ // output is empty because bundle not trigger yet.
+ Assert.assertTrue(testHarness.getOutput().isEmpty());
+ // bundle trigger emit.
+ testHarness.processElement(insertRecord("book", 12, 1L));
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 10, 1L));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+
+ // clear state.
+ testHarness.setStateTtlProcessingTime(10);
+
+ testHarness.processElement(insertRecord("book", 14, 3L));
+ testHarness.processElement(insertRecord("book", 15, 1L));
+ // watermark trigger emit.
+ testHarness.processWatermark(new Watermark(3L));
+ expectedOutput.add(insertRecord("book", 15, 1L));
+ expectedOutput.add(new Watermark(3L));
+ testHarness.close();
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput,
testHarness.getOutput());
+ }
+
+ private OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ boolean generateUpdateBefore, boolean generateInsert, boolean
keepLastRow)
+ throws Exception {
+ RowTimeMiniBatchLatestChangeDeduplicateFunction func =
+ new RowTimeMiniBatchLatestChangeDeduplicateFunction(
+ inputRowType,
+ serializer,
+ minTtlTime.toMilliseconds(),
+ rowTimeIndex,
+ generateUpdateBefore,
+ generateInsert,
+ keepLastRow);
+ CountBundleTrigger trigger = new
CountBundleTrigger<RowData>(miniBatchSize);
+ KeyedMapBundleOperator<RowData, RowData, RowData, RowData>
keyedMapBundleOperator =
+ new KeyedMapBundleOperator(func, trigger);
+ return createTestHarness(keyedMapBundleOperator);
+ }
+}