This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 26d342601a5 [FLINK-36837][table] Introduce RowTimeDeduplicateFunction with Async State API 26d342601a5 is described below commit 26d342601a545293b6d9f447a033c0559ba86d14 Author: Xuyang <xyzhong...@163.com> AuthorDate: Fri Jan 17 20:30:25 2025 +0800 [FLINK-36837][table] Introduce RowTimeDeduplicateFunction with Async State API This closes #25728 --- .../nodes/exec/stream/StreamExecDeduplicate.java | 36 +++- .../runtime/stream/sql/DeduplicateITCase.scala | 36 +++- .../deduplicate/DeduplicateFunctionBase.java | 23 +-- .../ProcTimeDeduplicateKeepFirstRowFunction.java | 4 +- .../ProcTimeDeduplicateKeepLastRowFunction.java | 6 +- ...meMiniBatchDeduplicateKeepFirstRowFunction.java | 2 +- ...imeMiniBatchDeduplicateKeepLastRowFunction.java | 4 +- .../deduplicate/RowTimeDeduplicateFunction.java | 55 +++--- .../RowTimeMiniBatchDeduplicateFunction.java | 6 +- ...meMiniBatchLatestChangeDeduplicateFunction.java | 6 +- ....java => SyncStateDeduplicateFunctionBase.java} | 17 +- .../AsyncStateDeduplicateFunctionBase.java} | 27 ++- .../AsyncStateRowTimeDeduplicateFunction.java | 85 ++++++++++ .../{ => utils}/DeduplicateFunctionHelper.java | 19 ++- .../RowTimeDeduplicateFunctionHelper.java} | 62 +++---- .../RowTimeDeduplicateRecordsCombiner.java | 2 +- .../RowTimeDeduplicateFunctionTest.java | 186 ++++++++++++++------- .../RowTimeDeduplicateFunctionTestBase.java | 22 +-- 18 files changed, 371 insertions(+), 227 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java index 812ab1b5cbf..e5ec2d3518a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java @@ -22,6 +22,7 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -51,6 +52,7 @@ import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDed import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction; import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction; import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction; +import org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; import org.apache.flink.table.types.logical.RowType; @@ -256,6 +258,10 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData> return config.get(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED); } + protected boolean isAsyncStateEnabled() { + return config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED); + } + protected long getMiniBatchSize() { if (isMiniBatchEnabled()) { long size = config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE); @@ -332,15 +338,27 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData> trigger); } } else { - RowTimeDeduplicateFunction processFunction = - new RowTimeDeduplicateFunction( - rowTypeInfo, - stateRetentionTime, - rowtimeIndex, - generateUpdateBefore, - generateInsert(), - keepLastRow); - return new KeyedProcessOperator<>(processFunction); + if (isAsyncStateEnabled()) { + AsyncStateRowTimeDeduplicateFunction processFunction = + new AsyncStateRowTimeDeduplicateFunction( + rowTypeInfo, + stateRetentionTime, + rowtimeIndex, + generateUpdateBefore, + generateInsert(), + keepLastRow); + return new AsyncKeyedProcessOperator<>(processFunction); + } else { + RowTimeDeduplicateFunction processFunction = + new RowTimeDeduplicateFunction( + rowTypeInfo, + stateRetentionTime, + rowtimeIndex, + generateUpdateBefore, + generateInsert(), + keepLastRow); + return new KeyedProcessOperator<>(processFunction); + } } } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala index dccaf33bcb1..450270c47df 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala @@ -17,29 +17,31 @@ */ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.api.common.eventtime.{AscendingTimestampsWatermarks, TimestampAssigner, TimestampAssignerSupplier, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkStrategy} +import org.apache.flink.api.common.eventtime._ import org.apache.flink.core.testutils.EachCallbackWrapper import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils._ -import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOn} -import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn} +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.utils.LegacyRowExtension -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension +import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assumptions.assumeThat -import org.junit.jupiter.api.TestTemplate +import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension} +import java.util + import scala.collection.JavaConversions._ import scala.collection.mutable @ExtendWith(Array(classOf[ParameterizedTestExtension])) -class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode) +class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode, enableAsyncState: Boolean) extends StreamingWithMiniBatchTestBase(miniBatch, mode) { @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] = @@ -55,6 +57,14 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode) rowtimeTestData.+=((3, 4L, "Comment#2")) rowtimeTestData.+=((4, 4L, "Comment#3")) + @BeforeEach + override def before(): Unit = { + super.before() + tEnv.getConfig.set( + ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, + Boolean.box(enableAsyncState)) + } + @TestTemplate def testFirstRowOnProctime(): Unit = { val t = failingDataSource(TestData.tupleData3) @@ -425,3 +435,17 @@ class RowtimeExtractor extends WatermarkStrategy[(Int, Long, String)] { (e: (Int, Long, String), _: Long) => e._2 } } + +object DeduplicateITCase { + + @Parameters(name = "{0}, StateBackend={1}, EnableAsyncState={2}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(false)), + Array(MiniBatchOff, ROCKSDB_BACKEND, Boolean.box(false)), + Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(false)), + Array(MiniBatchOn, ROCKSDB_BACKEND, Boolean.box(false)), + Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true)) + ) + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java index b87a02e7d9c..d1eb00a5a56 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java @@ -18,16 +18,10 @@ package org.apache.flink.table.runtime.operators.deduplicate; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; - /** * Base class for deduplicate function. * @@ -36,7 +30,8 @@ import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfi * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ -abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { +public abstract class DeduplicateFunctionBase<T, K, IN, OUT> + extends KeyedProcessFunction<K, IN, OUT> { private static final long serialVersionUID = 1L; @@ -44,8 +39,6 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFuncti protected final TypeInformation<T> typeInfo; protected final long stateRetentionTime; protected final TypeSerializer<OUT> serializer; - // state stores previous message under the key. - protected ValueState<T> state; public DeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { @@ -53,16 +46,4 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFuncti this.stateRetentionTime = stateRetentionTime; this.serializer = serializer; } - - @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - ValueStateDescriptor<T> stateDesc = - new ValueStateDescriptor<>("deduplicate-state", typeInfo); - StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); - if (ttlConfig.isEnabled()) { - stateDesc.enableTimeToLive(ttlConfig); - } - state = getRuntimeContext().getState(stateDesc); - } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java index a01ae4a6886..d522b996540 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processFirstRowOnProcTime; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processFirstRowOnProcTime; /** This function is used to deduplicate on keys and keeps only first row. */ public class ProcTimeDeduplicateKeepFirstRowFunction - extends DeduplicateFunctionBase<Boolean, RowData, RowData, RowData> { + extends SyncStateDeduplicateFunctionBase<Boolean, RowData, RowData, RowData> { private static final long serialVersionUID = 5865777137707602549L; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java index 5843f274d75..f6eaf03e584 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java @@ -25,12 +25,12 @@ import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.util.Collector; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnChangelog; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnProcTime; /** This function is used to deduplicate on keys and keeps only last row. */ public class ProcTimeDeduplicateKeepLastRowFunction - extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> { + extends SyncStateDeduplicateFunctionBase<RowData, RowData, RowData, RowData> { private static final long serialVersionUID = -291348892087180350L; private final boolean generateUpdateBefore; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java index 70630ed9f04..dc6f018140e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java @@ -27,7 +27,7 @@ import javax.annotation.Nullable; import java.util.Map; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processFirstRowOnProcTime; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processFirstRowOnProcTime; /** * This function is used to get the first row for every key partition in miniBatch mode. diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java index eb51008e08e..3b71620aedb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java @@ -30,8 +30,8 @@ import javax.annotation.Nullable; import java.util.Map; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnChangelog; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnProcTime; /** This function is used to get the last row for every key partition in miniBatch mode. */ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java index 2d53d023102..03f8d0238af 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java @@ -18,18 +18,15 @@ package org.apache.flink.table.runtime.operators.deduplicate; -import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.deduplicate.utils.RowTimeDeduplicateFunctionHelper; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.util.Collector; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult; - /** This function is used to deduplicate on keys and keeps only first or last row on row time. */ public class RowTimeDeduplicateFunction - extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> { + extends SyncStateDeduplicateFunctionBase<RowData, RowData, RowData, RowData> { private static final long serialVersionUID = 1L; @@ -38,6 +35,8 @@ public class RowTimeDeduplicateFunction private final int rowtimeIndex; private final boolean keepLastRow; + private transient SyncStateRowTimeDeduplicateFunctionHelper helper; + public RowTimeDeduplicateFunction( InternalTypeInfo<RowData> typeInfo, long minRetentionTime, @@ -52,39 +51,29 @@ public class RowTimeDeduplicateFunction this.keepLastRow = keepLastRow; } + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + helper = new SyncStateRowTimeDeduplicateFunctionHelper(); + } + @Override public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception { - deduplicateOnRowTime( - state, input, out, generateUpdateBefore, generateInsert, rowtimeIndex, keepLastRow); + RowData prevRow = state.value(); + helper.deduplicateOnRowTime(input, prevRow, out); } - /** - * Processes element to deduplicate on keys with row time semantic, sends current element if it - * is last or first row, retracts previous element if needed. - * - * @param state state of function - * @param currentRow latest row received by deduplicate function - * @param out underlying collector - * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not - * @param generateInsert flag to gennerate INSERT message or not - * @param rowtimeIndex the index of rowtime field - * @param keepLastRow flag to keep last row or keep first row - */ - public static void deduplicateOnRowTime( - ValueState<RowData> state, - RowData currentRow, - Collector<RowData> out, - boolean generateUpdateBefore, - boolean generateInsert, - int rowtimeIndex, - boolean keepLastRow) - throws Exception { - checkInsertOnly(currentRow); - RowData preRow = state.value(); + private class SyncStateRowTimeDeduplicateFunctionHelper + extends RowTimeDeduplicateFunctionHelper { + + public SyncStateRowTimeDeduplicateFunctionHelper() { + super(generateUpdateBefore, generateInsert, rowtimeIndex, keepLastRow); + } - if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) { - updateDeduplicateResult(generateUpdateBefore, generateInsert, preRow, currentRow, out); + @Override + protected void updateState(RowData currentRow) throws Exception { state.update(currentRow); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java index 73d8f28335a..6b342e8a9fc 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java @@ -30,9 +30,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult; /** This function is used to get the first or last row for every key partition in miniBatch mode. */ public class RowTimeMiniBatchDeduplicateFunction diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java index a46aed4f66a..f8d8192b006 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java @@ -27,9 +27,9 @@ import javax.annotation.Nullable; import java.util.Map; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult; /** * This function is used to get the first or last row for every key partition in miniBatch mode. But diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java similarity index 79% copy from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java copy to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java index b87a02e7d9c..a1d05512724 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java @@ -24,39 +24,34 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** - * Base class for deduplicate function. + * Base class for deduplicate function with sync state api. * * @param <T> Type of the value in the state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ -abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { +abstract class SyncStateDeduplicateFunctionBase<T, K, IN, OUT> + extends DeduplicateFunctionBase<T, K, IN, OUT> { private static final long serialVersionUID = 1L; - // the TypeInformation of the values in the state. - protected final TypeInformation<T> typeInfo; - protected final long stateRetentionTime; - protected final TypeSerializer<OUT> serializer; // state stores previous message under the key. protected ValueState<T> state; - public DeduplicateFunctionBase( + public SyncStateDeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { - this.typeInfo = typeInfo; - this.stateRetentionTime = stateRetentionTime; - this.serializer = serializer; + super(typeInfo, serializer, stateRetentionTime); } @Override public void open(OpenContext openContext) throws Exception { super.open(openContext); + ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java similarity index 70% copy from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java copy to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java index b87a02e7d9c..f3af0b883d4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java @@ -16,53 +16,50 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.operators.deduplicate; +package org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase; import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** - * Base class for deduplicate function. + * Base class for deduplicate function with async state api. * * @param <T> Type of the value in the state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ -abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { +abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT> + extends DeduplicateFunctionBase<T, K, IN, OUT> { private static final long serialVersionUID = 1L; - // the TypeInformation of the values in the state. - protected final TypeInformation<T> typeInfo; - protected final long stateRetentionTime; - protected final TypeSerializer<OUT> serializer; // state stores previous message under the key. protected ValueState<T> state; - public DeduplicateFunctionBase( + public AsyncStateDeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { - this.typeInfo = typeInfo; - this.stateRetentionTime = stateRetentionTime; - this.serializer = serializer; + super(typeInfo, serializer, stateRetentionTime); } @Override public void open(OpenContext openContext) throws Exception { super.open(openContext); + ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } - state = getRuntimeContext().getState(stateDesc); + state = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(stateDesc); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java new file mode 100644 index 00000000000..c1f1f8bdad4 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction; +import org.apache.flink.table.runtime.operators.deduplicate.utils.RowTimeDeduplicateFunctionHelper; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +/** + * This function is used to deduplicate on keys and keeps only first or last row on row time. + * + * <p>Different with {@link RowTimeDeduplicateFunction}, this function is based on async state api. + */ +public class AsyncStateRowTimeDeduplicateFunction + extends AsyncStateDeduplicateFunctionBase<RowData, RowData, RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final boolean generateUpdateBefore; + private final boolean generateInsert; + private final int rowtimeIndex; + private final boolean keepLastRow; + + private transient RowTimeDeduplicateFunctionHelper helper; + + public AsyncStateRowTimeDeduplicateFunction( + InternalTypeInfo<RowData> typeInfo, + long minRetentionTime, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert, + boolean keepLastRow) { + super(typeInfo, null, minRetentionTime); + this.generateUpdateBefore = generateUpdateBefore; + this.generateInsert = generateInsert; + this.rowtimeIndex = rowtimeIndex; + this.keepLastRow = keepLastRow; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + helper = new AsyncStateRowTimeDeduplicateFunctionHelper(); + } + + @Override + public void processElement(RowData input, Context ctx, Collector<RowData> out) + throws Exception { + state.asyncValue().thenAccept(prevRow -> helper.deduplicateOnRowTime(input, prevRow, out)); + } + + private class AsyncStateRowTimeDeduplicateFunctionHelper + extends RowTimeDeduplicateFunctionHelper { + + public AsyncStateRowTimeDeduplicateFunctionHelper() { + super(generateUpdateBefore, generateInsert, rowtimeIndex, keepLastRow); + } + + @Override + protected void updateState(RowData currentRow) throws Exception { + // no need to wait this async request to end + state.asyncUpdate(currentRow); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java similarity index 94% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java index b1776c8ca74..e9c7f81b51e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.operators.deduplicate; +package org.apache.flink.table.runtime.operators.deduplicate.utils; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.table.data.RowData; @@ -25,7 +25,12 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -/** Utility for deduplicate function. */ +/** + * Utility for deduplicate function. + * + * <p>TODO utilize the respective helper classes that inherit from an abstract deduplicate function + * helper in each deduplicate function. + */ public class DeduplicateFunctionHelper { /** @@ -39,7 +44,7 @@ public class DeduplicateFunctionHelper { * @param isStateTtlEnabled whether state ttl is disabled * @param equaliser the record equaliser used to equal RowData. */ - static void processLastRowOnProcTime( + public static void processLastRowOnProcTime( RowData currentRow, boolean generateUpdateBefore, boolean generateInsert, @@ -95,7 +100,7 @@ public class DeduplicateFunctionHelper { * @param state state of function * @param out underlying collector */ - static void processLastRowOnChangelog( + public static void processLastRowOnChangelog( RowData currentRow, boolean generateUpdateBefore, ValueState<RowData> state, @@ -155,7 +160,7 @@ public class DeduplicateFunctionHelper { * @param state state of function * @param out underlying collector */ - static void processFirstRowOnProcTime( + public static void processFirstRowOnProcTime( RowData currentRow, ValueState<Boolean> state, Collector<RowData> out) throws Exception { @@ -178,7 +183,7 @@ public class DeduplicateFunctionHelper { * @param currentRow current row under the key which is the duplicate row * @param out underlying collector */ - static void updateDeduplicateResult( + public static void updateDeduplicateResult( boolean generateUpdateBefore, boolean generateInsert, RowData preRow, @@ -223,7 +228,7 @@ public class DeduplicateFunctionHelper { } /** check message should be insert only. */ - static void checkInsertOnly(RowData currentRow) { + public static void checkInsertOnly(RowData currentRow) { Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java similarity index 53% copy from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java copy to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java index 2d53d023102..56867fd4dc8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java @@ -16,76 +16,60 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.operators.deduplicate; +package org.apache.flink.table.runtime.operators.deduplicate.utils; -import org.apache.flink.api.common.state.ValueState; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction; +import org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction; import org.apache.flink.util.Collector; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult; +import javax.annotation.Nullable; -/** This function is used to deduplicate on keys and keeps only first or last row on row time. */ -public class RowTimeDeduplicateFunction - extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> { +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult; - private static final long serialVersionUID = 1L; +/** + * A helper to deduplicate data with row time in {@link RowTimeDeduplicateFunction} and {@link + * AsyncStateRowTimeDeduplicateFunction}. + */ +public abstract class RowTimeDeduplicateFunctionHelper { private final boolean generateUpdateBefore; private final boolean generateInsert; private final int rowtimeIndex; private final boolean keepLastRow; - public RowTimeDeduplicateFunction( - InternalTypeInfo<RowData> typeInfo, - long minRetentionTime, - int rowtimeIndex, + public RowTimeDeduplicateFunctionHelper( boolean generateUpdateBefore, boolean generateInsert, + int rowtimeIndex, boolean keepLastRow) { - super(typeInfo, null, minRetentionTime); this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; this.rowtimeIndex = rowtimeIndex; this.keepLastRow = keepLastRow; } - @Override - public void processElement(RowData input, Context ctx, Collector<RowData> out) - throws Exception { - deduplicateOnRowTime( - state, input, out, generateUpdateBefore, generateInsert, rowtimeIndex, keepLastRow); - } - /** * Processes element to deduplicate on keys with row time semantic, sends current element if it * is last or first row, retracts previous element if needed. * - * @param state state of function * @param currentRow latest row received by deduplicate function + * @param prevRow previous row received by deduplicate function. `null` if current row is the + * first row * @param out underlying collector - * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not - * @param generateInsert flag to gennerate INSERT message or not - * @param rowtimeIndex the index of rowtime field - * @param keepLastRow flag to keep last row or keep first row */ - public static void deduplicateOnRowTime( - ValueState<RowData> state, - RowData currentRow, - Collector<RowData> out, - boolean generateUpdateBefore, - boolean generateInsert, - int rowtimeIndex, - boolean keepLastRow) + public void deduplicateOnRowTime( + RowData currentRow, @Nullable RowData prevRow, Collector<RowData> out) throws Exception { checkInsertOnly(currentRow); - RowData preRow = state.value(); - if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) { - updateDeduplicateResult(generateUpdateBefore, generateInsert, preRow, currentRow, out); - state.update(currentRow); + if (isDuplicate(prevRow, currentRow, rowtimeIndex, keepLastRow)) { + updateDeduplicateResult(generateUpdateBefore, generateInsert, prevRow, currentRow, out); + updateState(currentRow); } } + + protected abstract void updateState(RowData currentRow) throws Exception; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java index ebb91688791..229c150cca2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java @@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.util.WindowKey; import java.util.Iterator; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; -import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate; +import static org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate; /** * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java index fc56f5d5e8a..cefe67157b2 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java @@ -18,13 +18,16 @@ package org.apache.flink.table.runtime.operators.deduplicate; +import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger; +import org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.types.RowKind; @@ -41,21 +44,26 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord import static org.apache.flink.table.runtime.util.StreamRecordUtils.record; /** - * Harness tests for {@link RowTimeDeduplicateFunction} and {@link - * RowTimeMiniBatchDeduplicateFunction}. + * Harness tests for {@link RowTimeDeduplicateFunction}, {@link + * AsyncStateRowTimeDeduplicateFunction} and {@link RowTimeMiniBatchDeduplicateFunction}. */ @ExtendWith(ParameterizedTestExtension.class) class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase { private final boolean miniBatchEnable; + private final boolean enableAsyncState; - RowTimeDeduplicateFunctionTest(boolean miniBacthEnable) { - this.miniBatchEnable = miniBacthEnable; + RowTimeDeduplicateFunctionTest(boolean miniBatchEnable, boolean enableAsyncState) { + this.enableAsyncState = enableAsyncState; + this.miniBatchEnable = miniBatchEnable; } - @Parameters(name = "miniBatchEnable = {0}") + @Parameters(name = "miniBatchEnable = {0}, enableAsyncState = {1}") private static Collection<Boolean[]> runMode() { - return Arrays.asList(new Boolean[] {false}, new Boolean[] {true}); + return Arrays.asList( + new Boolean[] {false, false}, + new Boolean[] {false, true}, + new Boolean[] {true, false}); } @TestTemplate @@ -66,8 +74,11 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase expectedOutput.add(new Watermark(102)); expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L)); expectedOutput.add(new Watermark(302)); - expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); - expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + // async state does not support set ttl + if (!enableAsyncState) { + expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + } expectedOutput.add(new Watermark(402)); // generateUpdateBefore: true, generateInsert: true @@ -86,8 +97,11 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase expectedOutput.add(new Watermark(102)); expectedOutput.add(record(RowKind.UPDATE_AFTER, "key3", 5, 299L)); expectedOutput.add(new Watermark(302)); - expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L)); - expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L)); + // async state does not support set ttl + if (!enableAsyncState) { + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L)); + } expectedOutput.add(new Watermark(402)); testRowTimeDeduplicateKeepFirstRow(false, false, expectedOutput); } @@ -106,8 +120,17 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 301L)); expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L)); expectedOutput.add(new Watermark(302)); - expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); - expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + if (enableAsyncState) { + // async state does not support set ttl + expectedOutput.add(record(RowKind.UPDATE_BEFORE, "key1", 12, 300L)); + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.UPDATE_BEFORE, "key2", 11, 301L)); + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L)); + } else { + expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + } + expectedOutput.add(new Watermark(402)); // generateUpdateBefore: true, generateInsert: true @@ -126,8 +149,14 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 301L)); expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L)); expectedOutput.add(new Watermark(302)); - expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); - expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + if (enableAsyncState) { + // async state does not support set ttl + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L)); + } else { + expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L)); + expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L)); + } expectedOutput.add(new Watermark(402)); testRowTimeDeduplicateKeepLastRow(false, true, expectedOutput); @@ -153,30 +182,47 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase final boolean keepLastRow = false; OneInputStreamOperatorTestHarness<RowData, RowData> testHarness; KeyedMapBundleOperator<RowData, RowData, RowData, RowData> keyedMapBundleOperator = null; - KeyedProcessOperator keyedProcessOperator = null; + OneInputStreamOperator<RowData, RowData> keyedProcessOperator = null; if (miniBatchEnable) { - RowTimeMiniBatchDeduplicateFunction func = - new RowTimeMiniBatchDeduplicateFunction( - inputRowType, - serializer, - minTtlTime.toMillis(), - rowTimeIndex, - generateUpdateBefore, - generateInsert, - keepLastRow); - CountBundleTrigger trigger = new CountBundleTrigger<RowData>(miniBatchSize); - keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger); - testHarness = createTestHarness(keyedMapBundleOperator); + if (enableAsyncState) { + throw new UnsupportedOperationException( + "Mini-batch deduplicate op is not supported async state api"); + } else { + RowTimeMiniBatchDeduplicateFunction func = + new RowTimeMiniBatchDeduplicateFunction( + inputRowType, + serializer, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + keepLastRow); + CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(miniBatchSize); + keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger); + testHarness = createTestHarness(keyedMapBundleOperator); + } } else { - RowTimeDeduplicateFunction func = - new RowTimeDeduplicateFunction( - inputRowType, - minTtlTime.toMillis(), - rowTimeIndex, - generateUpdateBefore, - generateInsert, - keepLastRow); - keyedProcessOperator = new KeyedProcessOperator<>(func); + if (enableAsyncState) { + AsyncStateRowTimeDeduplicateFunction func = + new AsyncStateRowTimeDeduplicateFunction( + inputRowType, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + keepLastRow); + keyedProcessOperator = new AsyncKeyedProcessOperator<>(func); + } else { + RowTimeDeduplicateFunction func = + new RowTimeDeduplicateFunction( + inputRowType, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + keepLastRow); + keyedProcessOperator = new KeyedProcessOperator<>(func); + } testHarness = createTestHarness(keyedProcessOperator); } @@ -233,30 +279,47 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase final boolean keepLastRow = true; OneInputStreamOperatorTestHarness<RowData, RowData> testHarness; KeyedMapBundleOperator<RowData, RowData, RowData, RowData> keyedMapBundleOperator = null; - KeyedProcessOperator keyedProcessOperator = null; + OneInputStreamOperator<RowData, RowData> keyedProcessOperator = null; if (miniBatchEnable) { - RowTimeMiniBatchDeduplicateFunction func = - new RowTimeMiniBatchDeduplicateFunction( - inputRowType, - serializer, - minTtlTime.toMillis(), - rowTimeIndex, - generateUpdateBefore, - generateInsert, - keepLastRow); - CountBundleTrigger trigger = new CountBundleTrigger<RowData>(miniBatchSize); - keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger); - testHarness = createTestHarness(keyedMapBundleOperator); + if (enableAsyncState) { + throw new UnsupportedOperationException( + "Mini-batch deduplicate op is not supported async state api"); + } else { + RowTimeMiniBatchDeduplicateFunction func = + new RowTimeMiniBatchDeduplicateFunction( + inputRowType, + serializer, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + keepLastRow); + CountBundleTrigger<RowData> trigger = new CountBundleTrigger<>(miniBatchSize); + keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger); + testHarness = createTestHarness(keyedMapBundleOperator); + } } else { - RowTimeDeduplicateFunction func = - new RowTimeDeduplicateFunction( - inputRowType, - minTtlTime.toMillis(), - rowTimeIndex, - generateUpdateBefore, - generateInsert, - true); - keyedProcessOperator = new KeyedProcessOperator<>(func); + if (enableAsyncState) { + AsyncStateRowTimeDeduplicateFunction func = + new AsyncStateRowTimeDeduplicateFunction( + inputRowType, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + true); + keyedProcessOperator = new AsyncKeyedProcessOperator<>(func); + } else { + RowTimeDeduplicateFunction func = + new RowTimeDeduplicateFunction( + inputRowType, + minTtlTime.toMillis(), + rowTimeIndex, + generateUpdateBefore, + generateInsert, + true); + keyedProcessOperator = new KeyedProcessOperator<>(func); + } testHarness = createTestHarness(keyedProcessOperator); } @@ -298,8 +361,11 @@ class RowTimeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase testHarness.processElement(insertRecord("key2", 11, 401L)); testHarness.processWatermark(402); - // all state has expired, so the record ("key1", 12, 400L), ("key2", 12, 401L) will be - // INSERT message + // all state has expired with sync state api, so the record ("key1", 12, 400L), + // ("key2", 12, 401L) will be INSERT message; + // the state does not expire with async state api because async state api does not support + // set ttl , so the record ("key1", 12, 400L), ("key2", 12, 401L) will be output with + // UPDATE_AFTER message actualOutput.addAll(testHarness.getOutput()); assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java index 777c4269521..988019d4a99 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java @@ -19,12 +19,13 @@ package org.apache.flink.table.runtime.operators.deduplicate; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; @@ -55,14 +56,13 @@ abstract class RowTimeDeduplicateFunctionTestBase { rowKeyIndex, inputRowType.toRowFieldTypes()[rowKeyIndex])); protected OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( - KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception { - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, rowKeySelector, rowKeySelector.getProducedType()); - } - - protected OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( - KeyedMapBundleOperator<RowData, RowData, RowData, RowData> operator) throws Exception { - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, rowKeySelector, rowKeySelector.getProducedType()); + OneInputStreamOperator<RowData, RowData> operator) throws Exception { + if (operator instanceof AsyncStateProcessingOperator) { + return AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, rowKeySelector, rowKeySelector.getProducedType()); + } else { + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, rowKeySelector, rowKeySelector.getProducedType()); + } } }