This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8ada0ba123e [FLINK-36926][table] Introduce window join operator with async state api 8ada0ba123e is described below commit 8ada0ba123e99a408e7bafc8bcf90057d38d20b7 Author: Xuyang <xyzhong...@163.com> AuthorDate: Tue Jan 14 22:20:13 2025 +0800 [FLINK-36926][table] Introduce window join operator with async state api This closes #25815 --- .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java | 6 + .../KeyedTwoInputStreamOperatorTestHarness.java | 5 +- ...syncKeyedTwoInputStreamOperatorTestHarness.java | 56 ++- .../nodes/exec/stream/StreamExecWindowJoin.java | 13 +- .../runtime/stream/sql/WindowJoinITCase.scala | 19 +- .../operators/AsyncStateTableStreamOperator.java | 122 ++++++ .../operators/join/window/WindowJoinOperator.java | 440 ++------------------- .../join/window/WindowJoinOperatorBuilder.java | 90 ++--- .../AsyncStateWindowJoinOperator.java | 277 +++++++++++++ .../join/window/utils/WindowJoinHelper.java | 399 +++++++++++++++++++ .../asyncprocessing/state/WindowAsyncState.java | 33 ++ .../state/WindowListAsyncState.java | 61 +++ .../join/window/WindowJoinOperatorTest.java | 35 +- 13 files changed, 1062 insertions(+), 494 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index dae4a959d8d..87cffb259f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2.adaptor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.InternalCheckpointListener; import org.apache.flink.api.common.state.v2.State; @@ -197,4 +198,9 @@ public class AsyncKeyedStateBackendAdaptor<K> implements AsyncKeyedStateBackend< public boolean isSafeToReuseKVState() { return keyedStateBackend.isSafeToReuseKVState(); } + + @VisibleForTesting + public CheckpointableKeyedStateBackend<K> getKeyedStateBackend() { + return keyedStateBackend; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index b2966f36179..04db4fc6b05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -69,7 +69,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> if (keyedStateBackend instanceof HeapKeyedStateBackend) { return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(); } else { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException( + String.format( + "Unsupported keyed state backend: %s", + keyedStateBackend.getClass().getCanonicalName())); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java index 687ac8dce07..17eacf983dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java @@ -18,10 +18,13 @@ package org.apache.flink.streaming.util.asyncprocessing; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -31,7 +34,7 @@ import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStatePr import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.RunnableWithException; @@ -53,7 +56,7 @@ import static org.assertj.core.api.Assertions.fail; * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> - extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { + extends KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> { private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator; @@ -128,15 +131,14 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> int numSubtasks, int subtaskIndex) throws Exception { - super(operator, maxParallelism, numSubtasks, subtaskIndex); - - ClosureCleaner.clean(keySelector1, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); - ClosureCleaner.clean(keySelector2, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); - config.setStatePartitioner(0, keySelector1); - config.setStatePartitioner(1, keySelector2); - config.setStateKeySerializer( - keyType.createSerializer(executionConfig.getSerializerConfig())); - config.serializeAllConfigs(); + super( + operator, + keySelector1, + keySelector2, + keyType, + maxParallelism, + numSubtasks, + subtaskIndex); Preconditions.checkState( operator instanceof AsyncStateProcessingOperator, @@ -239,6 +241,34 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> executor.shutdown(); } + @Override + public int numKeyedStateEntries() { + AbstractAsyncStateStreamOperator<OUT> asyncOp = + (AbstractAsyncStateStreamOperator<OUT>) operator; + AsyncKeyedStateBackend<Object> asyncKeyedStateBackend = asyncOp.getAsyncKeyedStateBackend(); + KeyedStateBackend<?> keyedStateBackend; + if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { + keyedStateBackend = + ((AsyncKeyedStateBackendAdaptor<?>) asyncKeyedStateBackend) + .getKeyedStateBackend(); + + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported async keyed state backend: %s", + asyncKeyedStateBackend.getClass().getCanonicalName())); + } + + if (keyedStateBackend instanceof HeapKeyedStateBackend) { + return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported keyed state backend: %s", + keyedStateBackend.getClass().getCanonicalName())); + } + } + private void executeAndGet(RunnableWithException runnable) throws Exception { try { execute( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java index 3f0bc5be8ea..3f4b94a18a2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java @@ -22,8 +22,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy; @@ -42,7 +44,6 @@ import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator; import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperatorBuilder; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.TimeWindowUtil; @@ -172,7 +173,7 @@ public class StreamExecWindowJoin extends ExecNodeBase<RowData> TimeWindowUtil.getShiftTimeZone( leftWindowing.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config)); - WindowJoinOperator operator = + WindowJoinOperatorBuilder operatorBuilder = WindowJoinOperatorBuilder.builder() .leftSerializer(leftTypeInfo.toRowSerializer()) .rightSerializer(rightTypeInfo.toRowSerializer()) @@ -181,8 +182,12 @@ public class StreamExecWindowJoin extends ExecNodeBase<RowData> .rightWindowEndIndex(rightWindowEndIndex) .filterNullKeys(joinSpec.getFilterNulls()) .joinType(joinSpec.getJoinType()) - .withShiftTimezone(shiftTimeZone) - .build(); + .withShiftTimezone(shiftTimeZone); + if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) { + operatorBuilder.enableAsyncState(); + } + + TwoInputStreamOperator<RowData, RowData, RowData> operator = operatorBuilder.build(); final RowType returnType = (RowType) getOutputType(); final TwoInputTransformation<RowData, RowData, RowData> transform = diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala index 163fd343fee..416da176395 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.configuration.{Configuration, RestartStrategyOptions} import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} @@ -35,7 +36,7 @@ import java.util import scala.collection.JavaConversions._ @ExtendWith(Array(classOf[ParameterizedTestExtension])) -class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean) +class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean, enableAsyncState: Boolean) extends StreamingWithStateTestBase(mode) { val SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai") @@ -55,6 +56,10 @@ class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean) env.configure(configuration, Thread.currentThread.getContextClassLoader) FailingCollectionSource.reset() + tEnv.getConfig.set( + ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, + Boolean.box(enableAsyncState)) + val dataId1 = TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp) val dataIdWithLtz = TestValuesTableFactory.registerData(TestData.windowDataWithLtzInShanghai) tEnv.executeSql( @@ -1159,13 +1164,15 @@ class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean) object WindowJoinITCase { - @Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}") + @Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}, EnableAsyncState = {2}") def parameters(): util.Collection[Array[java.lang.Object]] = { Seq[Array[AnyRef]]( - Array(HEAP_BACKEND, java.lang.Boolean.TRUE), - Array(HEAP_BACKEND, java.lang.Boolean.FALSE), - Array(ROCKSDB_BACKEND, java.lang.Boolean.TRUE), - Array(ROCKSDB_BACKEND, java.lang.Boolean.FALSE) + Array(HEAP_BACKEND, java.lang.Boolean.TRUE, Boolean.box(false)), + Array(HEAP_BACKEND, java.lang.Boolean.FALSE, Boolean.box(false)), + Array(HEAP_BACKEND, java.lang.Boolean.TRUE, Boolean.box(true)), + Array(HEAP_BACKEND, java.lang.Boolean.FALSE, Boolean.box(true)), + Array(ROCKSDB_BACKEND, java.lang.Boolean.TRUE, Boolean.box(false)), + Array(ROCKSDB_BACKEND, java.lang.Boolean.FALSE, Boolean.box(false)) ) } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java new file mode 100644 index 00000000000..9807352470f --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators; + +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Table operator to invoke close always. This is a base class for both batch and stream operators + * without key. + * + * <p>This class is nearly identical with {@link TableStreamOperator}, but extending from {@link + * AbstractAsyncStateStreamOperator} to integrate with asynchronous state access. + */ +public abstract class AsyncStateTableStreamOperator<OUT> + extends AbstractAsyncStateStreamOperator<OUT> { + + /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ + protected long currentWatermark = Long.MIN_VALUE; + + protected transient ContextImpl ctx; + + @Override + public void open() throws Exception { + super.open(); + this.ctx = new ContextImpl(getProcessingTimeService()); + } + + @Override + public boolean useSplittableTimers() { + return true; + } + + @Override + public Watermark preProcessWatermark(Watermark mark) throws Exception { + currentWatermark = mark.getTimestamp(); + return super.preProcessWatermark(mark); + } + + /** Information available in an invocation of processElement. */ + protected class ContextImpl implements TimerService { + + protected final ProcessingTimeService timerService; + + public StreamRecord<?> element; + + ContextImpl(ProcessingTimeService timerService) { + this.timerService = checkNotNull(timerService); + } + + public Long timestamp() { + checkState(element != null); + + if (element.hasTimestamp()) { + return element.getTimestamp(); + } else { + return null; + } + } + + @Override + public long currentProcessingTime() { + return timerService.getCurrentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public void registerProcessingTimeTimer(long time) { + throw new UnsupportedOperationException( + "Setting timers is only supported on a keyed streams."); + } + + @Override + public void registerEventTimeTimer(long time) { + throw new UnsupportedOperationException( + "Setting timers is only supported on a keyed streams."); + } + + @Override + public void deleteProcessingTimeTimer(long time) { + throw new UnsupportedOperationException( + "Delete timers is only supported on a keyed streams."); + } + + @Override + public void deleteEventTimeTimer(long time) { + throw new UnsupportedOperationException( + "Delete timers is only supported on a keyed streams."); + } + + public TimerService timerService() { + return this; + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java index 710feb54a54..a087cfdada7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java @@ -23,10 +23,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -35,55 +31,41 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.RowDataUtil; -import org.apache.flink.table.data.utils.JoinedRowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.generated.JoinCondition; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper; import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl; import org.apache.flink.table.runtime.operators.window.tvf.state.WindowListState; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; -import org.apache.flink.types.RowKind; import java.time.ZoneId; -import java.util.IdentityHashMap; import java.util.List; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; - /** - * Streaming window join operator. + * A streaming window join operator implemented by sync state api. * - * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus + * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus, * late elements (elements belong to emitted windows) will be simply dropped. * * <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row. */ -public abstract class WindowJoinOperator extends TableStreamOperator<RowData> +public class WindowJoinOperator extends TableStreamOperator<RowData> implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, Long>, KeyContext { private static final long serialVersionUID = 1L; - private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = - "leftNumLateRecordsDropped"; - private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = - "leftLateRecordsDroppedRate"; - private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = - "rightNumLateRecordsDropped"; - private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = - "rightLateRecordsDroppedRate"; - private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; private static final String LEFT_RECORDS_STATE_NAME = "left-records"; private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; - protected final RowDataSerializer leftSerializer; - protected final RowDataSerializer rightSerializer; + private final RowDataSerializer leftSerializer; + private final RowDataSerializer rightSerializer; private final GeneratedJoinCondition generatedJoinCondition; private final int leftWindowEndIndex; @@ -92,26 +74,19 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> private final boolean[] filterNullKeys; private final ZoneId shiftTimeZone; + private final FlinkJoinType joinType; + private transient WindowTimerService<Long> windowTimerService; - // ------------------------------------------------------------------------ - protected transient JoinConditionWithNullFilters joinCondition; + private transient JoinConditionWithNullFilters joinCondition; /** This is used for emitting elements with a given timestamp. */ - protected transient TimestampedCollector<RowData> collector; + private transient TimestampedCollector<RowData> collector; private transient WindowListState<Long> leftWindowState; private transient WindowListState<Long> rightWindowState; - // ------------------------------------------------------------------------ - // Metrics - // ------------------------------------------------------------------------ - - private transient Counter leftNumLateRecordsDropped; - private transient Meter leftLateRecordsDroppedRate; - private transient Counter rightNumLateRecordsDropped; - private transient Meter rightLateRecordsDroppedRate; - private transient Gauge<Long> watermarkLatency; + private transient WindowJoinHelper helper; WindowJoinOperator( TypeSerializer<RowData> leftSerializer, @@ -120,7 +95,8 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> int leftWindowEndIndex, int rightWindowEndIndex, boolean[] filterNullKeys, - ZoneId shiftTimeZone) { + ZoneId shiftTimeZone, + FlinkJoinType joinType) { this.leftSerializer = (RowDataSerializer) leftSerializer; this.rightSerializer = (RowDataSerializer) rightSerializer; this.generatedJoinCondition = generatedJoinCondition; @@ -128,6 +104,7 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> this.rightWindowEndIndex = rightWindowEndIndex; this.filterNullKeys = filterNullKeys; this.shiftTimeZone = shiftTimeZone; + this.joinType = joinType; } @Override @@ -166,28 +143,8 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> this.rightWindowState = new WindowListState<>((InternalListState<RowData, Long, RowData>) rightListState); - // metrics - this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME); - this.leftLateRecordsDroppedRate = - metrics.meter( - LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, - new MeterView(leftNumLateRecordsDropped)); - this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME); - this.rightLateRecordsDroppedRate = - metrics.meter( - RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, - new MeterView(rightNumLateRecordsDropped)); - this.watermarkLatency = - metrics.gauge( - WATERMARK_LATENCY_METRIC_NAME, - () -> { - long watermark = windowTimerService.currentWatermark(); - if (watermark < 0) { - return 0L; - } else { - return windowTimerService.currentProcessingTime() - watermark; - } - }); + this.helper = new SyncStateWindowJoinHelper(); + this.helper.registerMetric(getRuntimeContext().getMetricGroup()); } @Override @@ -201,36 +158,20 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> @Override public void processElement1(StreamRecord<RowData> element) throws Exception { - processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState); + helper.processElement( + element, + leftWindowEndIndex, + helper.getLeftLateRecordsDroppedRate(), + (windowEnd, rowData) -> leftWindowState.add(windowEnd, rowData)); } @Override public void processElement2(StreamRecord<RowData> element) throws Exception { - processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState); - } - - private void processElement( - StreamRecord<RowData> element, - int windowEndIndex, - Meter lateRecordsDroppedRate, - WindowListState<Long> recordState) - throws Exception { - RowData inputRow = element.getValue(); - long windowEnd = inputRow.getLong(windowEndIndex); - if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) { - // element is late and should be dropped - lateRecordsDroppedRate.markEvent(); - return; - } - if (RowDataUtil.isAccumulateMsg(inputRow)) { - recordState.add(windowEnd, inputRow); - } else { - // Window join could not handle retraction input stream - throw new UnsupportedOperationException( - "This is a bug and should not happen. Please file an issue."); - } - // always register time for every element - windowTimerService.registerEventTimeWindowTimer(windowEnd); + helper.processElement( + element, + rightWindowEndIndex, + helper.getRightLateRecordsDroppedRate(), + (windowEnd, rowData) -> rightWindowState.add(windowEnd, rowData)); } @Override @@ -247,329 +188,28 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData> // join left records and right records List<RowData> leftData = leftWindowState.get(window); List<RowData> rightData = rightWindowState.get(window); - join(leftData, rightData); - // clear state - if (leftData != null) { - leftWindowState.clear(window); - } - if (rightData != null) { - rightWindowState.clear(window); - } + helper.joinAndClear(window, leftData, rightData); } - public abstract void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords); - - static class SemiAntiJoinOperator extends WindowJoinOperator { - - private final boolean isAntiJoin; - - SemiAntiJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - boolean isAntiJoin, - ZoneId shiftTimeZone) { - super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - this.isAntiJoin = isAntiJoin; - } - - @Override - public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) { - if (leftRecords == null) { - return; - } - if (rightRecords == null) { - if (isAntiJoin) { - for (RowData leftRecord : leftRecords) { - collector.collect(leftRecord); - } - } - return; - } - for (RowData leftRecord : leftRecords) { - boolean matches = false; - for (RowData rightRecord : rightRecords) { - if (joinCondition.apply(leftRecord, rightRecord)) { - matches = true; - break; - } - } - if (matches) { - if (!isAntiJoin) { - // emit left record if there are matched rows on the other side - collector.collect(leftRecord); - } - } else { - if (isAntiJoin) { - // emit left record if there is no matched row on the other side - collector.collect(leftRecord); - } - } - } - } - } + private class SyncStateWindowJoinHelper extends WindowJoinHelper { - static class InnerJoinOperator extends WindowJoinOperator { - private transient JoinedRowData outRow; - - InnerJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - ZoneId shiftTimeZone) { + public SyncStateWindowJoinHelper() { super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); + WindowJoinOperator.this.leftSerializer, + WindowJoinOperator.this.rightSerializer, + WindowJoinOperator.this.shiftTimeZone, + WindowJoinOperator.this.windowTimerService, + WindowJoinOperator.this.joinCondition, + WindowJoinOperator.this.collector, + WindowJoinOperator.this.joinType); } @Override - public void open() throws Exception { - super.open(); - outRow = new JoinedRowData(); - } - - @Override - public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) { - if (leftRecords == null || rightRecords == null) { - return; - } - for (RowData leftRecord : leftRecords) { - for (RowData rightRecord : rightRecords) { - if (joinCondition.apply(leftRecord, rightRecord)) { - outRow.setRowKind(RowKind.INSERT); - outRow.replace(leftRecord, rightRecord); - collector.collect(outRow); - } - } - } - } - } - - private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator { - - private static final long serialVersionUID = 1L; - - private transient RowData leftNullRow; - private transient RowData rightNullRow; - private transient JoinedRowData outRow; - - AbstractOuterJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - ZoneId shiftTimeZone) { - super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - } - - @Override - public void open() throws Exception { - super.open(); - leftNullRow = new GenericRowData(leftSerializer.getArity()); - rightNullRow = new GenericRowData(rightSerializer.getArity()); - outRow = new JoinedRowData(); - } - - protected void outputNullPadding(RowData row, boolean isLeft) { + public void clearState(long windowEnd, boolean isLeft) { if (isLeft) { - outRow.replace(row, rightNullRow); - } else { - outRow.replace(leftNullRow, row); - } - outRow.setRowKind(RowKind.INSERT); - collector.collect(outRow); - } - - protected void outputNullPadding(Iterable<RowData> rows, boolean isLeft) { - for (RowData row : rows) { - outputNullPadding(row, isLeft); - } - } - - protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { - if (inputIsLeft) { - outRow.replace(inputRow, otherRow); - } else { - outRow.replace(otherRow, inputRow); - } - outRow.setRowKind(RowKind.INSERT); - collector.collect(outRow); - } - } - - static class LeftOuterJoinOperator extends AbstractOuterJoinOperator { - - private static final long serialVersionUID = 1L; - - LeftOuterJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - ZoneId shiftTimeZone) { - super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - } - - @Override - public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) { - if (leftRecords == null) { - return; - } - if (rightRecords == null) { - outputNullPadding(leftRecords, true); - } else { - for (RowData leftRecord : leftRecords) { - boolean matches = false; - for (RowData rightRecord : rightRecords) { - if (joinCondition.apply(leftRecord, rightRecord)) { - output(leftRecord, rightRecord, true); - matches = true; - } - } - if (!matches) { - // padding null for left side - outputNullPadding(leftRecord, true); - } - } - } - } - } - - static class RightOuterJoinOperator extends AbstractOuterJoinOperator { - - private static final long serialVersionUID = 1L; - - RightOuterJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - ZoneId shiftTimeZone) { - super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - } - - @Override - public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) { - if (rightRecords == null) { - return; - } - if (leftRecords == null) { - outputNullPadding(rightRecords, false); - } else { - for (RowData rightRecord : rightRecords) { - boolean matches = false; - for (RowData leftRecord : leftRecords) { - if (joinCondition.apply(leftRecord, rightRecord)) { - output(leftRecord, rightRecord, true); - matches = true; - } - } - if (!matches) { - outputNullPadding(rightRecord, false); - } - } - } - } - } - - static class FullOuterJoinOperator extends AbstractOuterJoinOperator { - - private static final long serialVersionUID = 1L; - - FullOuterJoinOperator( - TypeSerializer<RowData> leftSerializer, - TypeSerializer<RowData> rightSerializer, - GeneratedJoinCondition generatedJoinCondition, - int leftWindowEndIndex, - int rightWindowEndIndex, - boolean[] filterNullKeys, - ZoneId shiftTimeZone) { - super( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - } - - @Override - public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) { - if (leftRecords == null && rightRecords == null) { - return; - } - if (rightRecords == null) { - outputNullPadding(leftRecords, true); - } else if (leftRecords == null) { - outputNullPadding(rightRecords, false); + leftWindowState.clear(windowEnd); } else { - IdentityHashMap<RowData, Boolean> emittedRightRecords = new IdentityHashMap<>(); - for (RowData leftRecord : leftRecords) { - boolean matches = false; - for (RowData rightRecord : rightRecords) { - if (joinCondition.apply(leftRecord, rightRecord)) { - output(leftRecord, rightRecord, true); - matches = true; - emittedRightRecords.put(rightRecord, Boolean.TRUE); - } - } - // padding null for left side - if (!matches) { - outputNullPadding(leftRecord, true); - } - } - // padding null for never emitted right side - for (RowData rightRecord : rightRecords) { - if (!emittedRightRecords.containsKey(rightRecord)) { - outputNullPadding(rightRecord, false); - } - } + rightWindowState.clear(windowEnd); } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java index c0e566555e7..051abfc18b2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java @@ -19,9 +19,11 @@ package org.apache.flink.table.runtime.operators.join.window; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.window.asyncprocessing.AsyncStateWindowJoinOperator; import java.time.ZoneId; @@ -41,6 +43,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * .rightWindowEndIndex(rightWindowEndIndex) * .filterNullKeys(filterNullKeys) * .joinType(joinType) + * .enableAsyncState() * .build(); * </pre> */ @@ -58,6 +61,7 @@ public class WindowJoinOperatorBuilder { private boolean[] filterNullKeys; private FlinkJoinType joinType; private ZoneId shiftTimeZone = ZoneId.of("UTC"); + private boolean enableAsyncState = false; public WindowJoinOperatorBuilder leftSerializer(TypeSerializer<RowData> leftSerializer) { this.leftSerializer = leftSerializer; @@ -105,7 +109,12 @@ public class WindowJoinOperatorBuilder { return this; } - public WindowJoinOperator build() { + public WindowJoinOperatorBuilder enableAsyncState() { + this.enableAsyncState = true; + return this; + } + + public TwoInputStreamOperator<RowData, RowData, RowData> build() { checkNotNull(leftSerializer); checkNotNull(rightSerializer); checkNotNull(generatedJoinCondition); @@ -123,65 +132,26 @@ public class WindowJoinOperatorBuilder { "Illegal window end index %s, it should not be negative!", rightWindowEndIndex)); - switch (joinType) { - case INNER: - return new WindowJoinOperator.InnerJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - case SEMI: - return new WindowJoinOperator.SemiAntiJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - false, - shiftTimeZone); - case ANTI: - return new WindowJoinOperator.SemiAntiJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - true, - shiftTimeZone); - case LEFT: - return new WindowJoinOperator.LeftOuterJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - case RIGHT: - return new WindowJoinOperator.RightOuterJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - case FULL: - return new WindowJoinOperator.FullOuterJoinOperator( - leftSerializer, - rightSerializer, - generatedJoinCondition, - leftWindowEndIndex, - rightWindowEndIndex, - filterNullKeys, - shiftTimeZone); - default: - throw new IllegalArgumentException("Invalid join type: " + joinType); + if (enableAsyncState) { + return new AsyncStateWindowJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone, + joinType); + } else { + return new WindowJoinOperator( + leftSerializer, + rightSerializer, + generatedJoinCondition, + leftWindowEndIndex, + rightWindowEndIndex, + filterNullKeys, + shiftTimeZone, + joinType); } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java new file mode 100644 index 00000000000..b0c5e037fb9 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window.asyncprocessing; + +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.AsyncStateTableStreamOperator; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator; +import org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper; +import org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state.WindowListAsyncState; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService; +import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link AsyncStateWindowJoinOperator} implemented by async state api. + * + * <p>This class is nearly identical with {@link WindowJoinOperator}, but extending from {@link + * AbstractAsyncStateStreamOperator} to integrate with asynchronous state access. + * + * <p>Note: currently, {@link AsyncStateWindowJoinOperator} doesn't support early-fire and + * late-arrival. Thus, late elements (elements belong to emitted windows) will be simply dropped. + * + * <p>Note: currently, {@link AsyncStateWindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE + * input row. + */ +public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator<RowData> + implements TwoInputStreamOperator<RowData, RowData, RowData>, + Triggerable<RowData, Long>, + KeyContext { + + private static final long serialVersionUID = 1L; + + private static final String LEFT_RECORDS_STATE_NAME = "left-records"; + private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + + private final RowDataSerializer leftSerializer; + private final RowDataSerializer rightSerializer; + private final GeneratedJoinCondition generatedJoinCondition; + + private final int leftWindowEndIndex; + private final int rightWindowEndIndex; + + private final boolean[] filterNullKeys; + private final ZoneId shiftTimeZone; + + private final FlinkJoinType joinType; + + private transient WindowTimerService<Long> windowTimerService; + + private transient JoinConditionWithNullFilters joinCondition; + + /** This is used for emitting elements with a given timestamp. */ + private transient TimestampedCollector<RowData> collector; + + private transient WindowListAsyncState<Long> leftWindowState; + private transient WindowListAsyncState<Long> rightWindowState; + + private transient WindowJoinHelper helper; + + public AsyncStateWindowJoinOperator( + TypeSerializer<RowData> leftSerializer, + TypeSerializer<RowData> rightSerializer, + GeneratedJoinCondition generatedJoinCondition, + int leftWindowEndIndex, + int rightWindowEndIndex, + boolean[] filterNullKeys, + ZoneId shiftTimeZone, + FlinkJoinType joinType) { + this.leftSerializer = (RowDataSerializer) leftSerializer; + this.rightSerializer = (RowDataSerializer) rightSerializer; + this.generatedJoinCondition = generatedJoinCondition; + this.leftWindowEndIndex = leftWindowEndIndex; + this.rightWindowEndIndex = rightWindowEndIndex; + this.filterNullKeys = filterNullKeys; + this.shiftTimeZone = shiftTimeZone; + this.joinType = joinType; + } + + @Override + public void open() throws Exception { + super.open(); + + this.collector = new TimestampedCollector<>(output); + collector.eraseTimestamp(); + + final LongSerializer windowSerializer = LongSerializer.INSTANCE; + + InternalTimerService<Long> internalTimerService = + getInternalTimerService("window-timers", windowSerializer, this); + this.windowTimerService = + new SlicingWindowTimerServiceImpl(internalTimerService, shiftTimeZone); + + // init join condition + JoinCondition condition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this); + this.joinCondition.setRuntimeContext(getRuntimeContext()); + this.joinCondition.open(DefaultOpenContext.INSTANCE); + + // init state + ListStateDescriptor<RowData> leftRecordStateDesc = + new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftSerializer); + ListState<RowData> leftListState = + getOrCreateKeyedState(Long.MIN_VALUE, windowSerializer, leftRecordStateDesc); + this.leftWindowState = + new WindowListAsyncState<>( + (InternalListState<RowData, Long, RowData>) leftListState); + + ListStateDescriptor<RowData> rightRecordStateDesc = + new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightSerializer); + ListState<RowData> rightListState = + getOrCreateKeyedState(Long.MIN_VALUE, windowSerializer, rightRecordStateDesc); + this.rightWindowState = + new WindowListAsyncState<>( + (InternalListState<RowData, Long, RowData>) rightListState); + + this.helper = new AsyncStateWindowJoinHelper(); + this.helper.registerMetric(getRuntimeContext().getMetricGroup()); + } + + @Override + public void close() throws Exception { + super.close(); + collector = null; + if (joinCondition != null) { + joinCondition.close(); + } + } + + @Override + public void processElement1(StreamRecord<RowData> element) throws Exception { + helper.processElement( + element, + leftWindowEndIndex, + helper.getLeftLateRecordsDroppedRate(), + (windowEnd, rowData) -> leftWindowState.asyncAdd(windowEnd, rowData)); + } + + @Override + public void processElement2(StreamRecord<RowData> element) throws Exception { + helper.processElement( + element, + rightWindowEndIndex, + helper.getRightLateRecordsDroppedRate(), + (windowEnd, rowData) -> rightWindowState.asyncAdd(windowEnd, rowData)); + } + + @Override + public void onProcessingTime(InternalTimer<RowData, Long> timer) throws Exception { + // Window join only support event-time now + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + + @Override + public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception { + asyncProcessWithKey(timer.getKey(), () -> triggerJoin(timer.getNamespace())); + } + + /** + * Currently, similar to the {@link WindowJoinOperator#onEventTime} that uses the sync state + * api, we directly load the list data from the state into memory to perform join operations. + * + * <p>Note: The order of data in the left and right side lists must be preserved to ensure the + * output data sequence is maintained. + */ + private void triggerJoin(long window) { + StateFuture<StateIterator<RowData>> leftDataFuture = leftWindowState.asyncGet(window); + StateFuture<StateIterator<RowData>> rightDataFuture = rightWindowState.asyncGet(window); + + // join left records and right records + AtomicReference<List<RowData>> leftDataRef = new AtomicReference<>(); + AtomicReference<List<RowData>> rightDataRef = new AtomicReference<>(); + leftDataFuture.thenCombine( + rightDataFuture, + (leftDataIterator, rightDataIterator) -> { + StateFuture<Void> leftLoadToMemFuture; + if (leftDataIterator == null) { + leftDataRef.set(null); + leftLoadToMemFuture = StateFutureUtils.completedVoidFuture(); + } else { + leftDataRef.set(new ArrayList<>()); + leftLoadToMemFuture = + leftDataIterator.onNext( + data -> { + leftDataRef.get().add(data); + }); + } + + StateFuture<Void> rightLoadToMemFuture; + if (rightDataIterator == null) { + rightDataRef.set(null); + rightLoadToMemFuture = StateFutureUtils.completedVoidFuture(); + } else { + rightDataRef.set(new ArrayList<>()); + rightLoadToMemFuture = + rightDataIterator.onNext( + data -> { + rightDataRef.get().add(data); + }); + } + + return leftLoadToMemFuture.thenCombine( + rightLoadToMemFuture, + (VOID1, VOID2) -> { + helper.joinAndClear(window, leftDataRef.get(), rightDataRef.get()); + return null; + }); + }); + } + + private class AsyncStateWindowJoinHelper extends WindowJoinHelper { + + public AsyncStateWindowJoinHelper() { + super( + AsyncStateWindowJoinOperator.this.leftSerializer, + AsyncStateWindowJoinOperator.this.rightSerializer, + AsyncStateWindowJoinOperator.this.shiftTimeZone, + AsyncStateWindowJoinOperator.this.windowTimerService, + AsyncStateWindowJoinOperator.this.joinCondition, + AsyncStateWindowJoinOperator.this.collector, + AsyncStateWindowJoinOperator.this.joinType); + } + + @Override + public void clearState(long windowEnd, boolean isLeft) { + // no need to wait these async requests to end + if (isLeft) { + leftWindowState.asyncClear(windowEnd); + } else { + rightWindowState.asyncClear(windowEnd); + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java new file mode 100644 index 00000000000..27dc5c4dbb2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.window.utils; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.RowDataUtil; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator; +import org.apache.flink.table.runtime.operators.join.window.asyncprocessing.AsyncStateWindowJoinOperator; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.function.BiConsumerWithException; + +import javax.annotation.Nullable; + +import java.time.ZoneId; +import java.util.IdentityHashMap; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; + +/** + * A helper to do the window join operations for {@link WindowJoinOperator} and {@link + * AsyncStateWindowJoinOperator}. + */ +public abstract class WindowJoinHelper { + + private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "leftNumLateRecordsDropped"; + private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "leftLateRecordsDroppedRate"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME = + "rightNumLateRecordsDropped"; + private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = + "rightLateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + + private final ZoneId shiftTimeZone; + + private final WindowTimerService<Long> windowTimerService; + + protected final RowDataSerializer leftSerializer; + + protected final RowDataSerializer rightSerializer; + + protected final JoinConditionWithNullFilters joinCondition; + + /** This is used for emitting elements with a given timestamp. */ + protected final TimestampedCollector<RowData> collector; + + private final WindowJoinProcessor windowJoinProcessor; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private Meter leftLateRecordsDroppedRate; + private Meter rightLateRecordsDroppedRate; + + public WindowJoinHelper( + RowDataSerializer leftSerializer, + RowDataSerializer rightSerializer, + ZoneId shiftTimeZone, + WindowTimerService<Long> windowTimerService, + JoinConditionWithNullFilters joinCondition, + TimestampedCollector<RowData> collector, + FlinkJoinType joinType) { + this.leftSerializer = leftSerializer; + this.rightSerializer = rightSerializer; + this.shiftTimeZone = shiftTimeZone; + this.windowTimerService = windowTimerService; + this.joinCondition = joinCondition; + this.collector = collector; + + this.windowJoinProcessor = getWindowJoinProcessor(joinType); + } + + public void registerMetric(OperatorMetricGroup metrics) { + // register metrics + Counter leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.leftLateRecordsDroppedRate = + metrics.meter( + LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(leftNumLateRecordsDropped)); + Counter rightNumLateRecordsDropped = + metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.rightLateRecordsDroppedRate = + metrics.meter( + RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(rightNumLateRecordsDropped)); + metrics.gauge( + WATERMARK_LATENCY_METRIC_NAME, + () -> { + long watermark = windowTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return windowTimerService.currentProcessingTime() - watermark; + } + }); + } + + public void processElement( + StreamRecord<RowData> element, + int windowEndIndex, + Meter lateRecordsDroppedRate, + BiConsumerWithException<Long, RowData, Exception> accStateConsumer) + throws Exception { + RowData inputRow = element.getValue(); + long windowEnd = inputRow.getLong(windowEndIndex); + if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) { + // element is late and should be dropped + lateRecordsDroppedRate.markEvent(); + return; + } + if (RowDataUtil.isAccumulateMsg(inputRow)) { + accStateConsumer.accept(windowEnd, inputRow); + } else { + // Window join could not handle retraction input stream + throw new UnsupportedOperationException( + "This is a bug and should not happen. Please file an issue."); + } + // always register time for every element + windowTimerService.registerEventTimeWindowTimer(windowEnd); + } + + public void joinAndClear( + long windowEnd, + @Nullable Iterable<RowData> leftRecords, + @Nullable Iterable<RowData> rightRecords) + throws Exception { + windowJoinProcessor.doJoin(leftRecords, rightRecords); + // clear state + if (leftRecords != null) { + clearState(windowEnd, true); + } + if (rightRecords != null) { + clearState(windowEnd, false); + } + } + + public Meter getLeftLateRecordsDroppedRate() { + return leftLateRecordsDroppedRate; + } + + public Meter getRightLateRecordsDroppedRate() { + return rightLateRecordsDroppedRate; + } + + public abstract void clearState(long windowEnd, boolean isLeft) throws Exception; + + private WindowJoinProcessor getWindowJoinProcessor(FlinkJoinType joinType) { + switch (joinType) { + case INNER: + return new InnerWindowJoinProcessor(); + case SEMI: + return new SemiAntiWindowJoinProcessor(false); + case ANTI: + return new SemiAntiWindowJoinProcessor(true); + case LEFT: + return new LeftOuterWindowJoinProcessor(); + case RIGHT: + return new RightOuterWindowJoinProcessor(); + case FULL: + return new FullOuterWindowJoinProcessor(); + default: + throw new IllegalArgumentException("Invalid join type: " + joinType); + } + } + + /** + * A processor to do the different logic for different {@link FlinkJoinType}. + * + * <p>TODO FLINK-37106 consider extracting common methods in different WindowJoinProcessor + */ + private interface WindowJoinProcessor { + + void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords); + } + + private class SemiAntiWindowJoinProcessor implements WindowJoinProcessor { + + private final boolean isAntiJoin; + + public SemiAntiWindowJoinProcessor(boolean isAntiJoin) { + this.isAntiJoin = isAntiJoin; + } + + @Override + public void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + if (isAntiJoin) { + for (RowData leftRecord : leftRecords) { + collector.collect(leftRecord); + } + } + return; + } + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + matches = true; + break; + } + } + if (matches) { + if (!isAntiJoin) { + // emit left record if there are matched rows on the other side + collector.collect(leftRecord); + } + } else { + if (isAntiJoin) { + // emit left record if there is no matched row on the other side + collector.collect(leftRecord); + } + } + } + } + } + + private class InnerWindowJoinProcessor implements WindowJoinProcessor { + + private final JoinedRowData outRow = new JoinedRowData(); + + @Override + public void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords) { + if (leftRecords == null || rightRecords == null) { + return; + } + for (RowData leftRecord : leftRecords) { + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + outRow.setRowKind(RowKind.INSERT); + outRow.replace(leftRecord, rightRecord); + collector.collect(outRow); + } + } + } + } + } + + private abstract class AbstractOuterWindowJoinProcessor implements WindowJoinProcessor { + + private final RowData leftNullRow = new GenericRowData(leftSerializer.getArity()); + private final RowData rightNullRow = new GenericRowData(rightSerializer.getArity()); + private final JoinedRowData outRow = new JoinedRowData(); + + protected void outputNullPadding(RowData row, boolean isLeft) { + if (isLeft) { + outRow.replace(row, rightNullRow); + } else { + outRow.replace(leftNullRow, row); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + + protected void outputNullPadding(Iterable<RowData> rows, boolean isLeft) { + for (RowData row : rows) { + outputNullPadding(row, isLeft); + } + } + + protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) { + if (inputIsLeft) { + outRow.replace(inputRow, otherRow); + } else { + outRow.replace(otherRow, inputRow); + } + outRow.setRowKind(RowKind.INSERT); + collector.collect(outRow); + } + } + + private class LeftOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor { + + @Override + public void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords) { + if (leftRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else { + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + // padding null for left side + outputNullPadding(leftRecord, true); + } + } + } + } + } + + private class RightOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor { + + @Override + public void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords) { + if (rightRecords == null) { + return; + } + if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + for (RowData rightRecord : rightRecords) { + boolean matches = false; + for (RowData leftRecord : leftRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + } + } + if (!matches) { + outputNullPadding(rightRecord, false); + } + } + } + } + } + + private class FullOuterWindowJoinProcessor extends AbstractOuterWindowJoinProcessor { + + @Override + public void doJoin( + @Nullable Iterable<RowData> leftRecords, @Nullable Iterable<RowData> rightRecords) { + if (leftRecords == null && rightRecords == null) { + return; + } + if (rightRecords == null) { + outputNullPadding(leftRecords, true); + } else if (leftRecords == null) { + outputNullPadding(rightRecords, false); + } else { + IdentityHashMap<RowData, Boolean> emittedRightRecords = new IdentityHashMap<>(); + for (RowData leftRecord : leftRecords) { + boolean matches = false; + for (RowData rightRecord : rightRecords) { + if (joinCondition.apply(leftRecord, rightRecord)) { + output(leftRecord, rightRecord, true); + matches = true; + emittedRightRecords.put(rightRecord, Boolean.TRUE); + } + } + // padding null for left side + if (!matches) { + outputNullPadding(leftRecord, true); + } + } + // padding null for never emitted right side + for (RowData rightRecord : rightRecords) { + if (!emittedRightRecords.containsKey(rightRecord)) { + outputNullPadding(rightRecord, false); + } + } + } + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java new file mode 100644 index 00000000000..0ee9ded008a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.table.runtime.operators.window.tvf.state.WindowState; + +/** + * A base interface for manipulate state with window namespace. + * + * <p>Different with {@link WindowState}, this interface is based on async state api. + */ +public interface WindowAsyncState<W> { + + /** Removes the value mapped under current key and the given window. */ + StateFuture<Void> asyncClear(W window); +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java new file mode 100644 index 00000000000..233c68c6782 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state; + +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.table.data.RowData; + +/** A wrapper of {@link ListState} which is easier to update based on window namespace. */ +public final class WindowListAsyncState<W> implements WindowAsyncState<W> { + + private final InternalListState<RowData, W, RowData> windowState; + + public WindowListAsyncState(InternalListState<RowData, W, RowData> windowState) { + this.windowState = windowState; + } + + @Override + public StateFuture<Void> asyncClear(W window) { + windowState.setCurrentNamespace(window); + return windowState.asyncClear(); + } + + public StateFuture<StateIterator<RowData>> asyncGet(W window) { + windowState.setCurrentNamespace(window); + return windowState.asyncGet(); + } + + /** + * Updates the operator state accessible by {@link #asyncGet(W)} by adding the given value to + * the list of values. The next time {@link #asyncGet(W)} is called (for the same state + * partition) the returned state will represent the updated list. + * + * <p>If null is passed in, the state value will remain unchanged. + * + * @param window The namespace for the state. + * @param value The new value for the state. + */ + public StateFuture<Void> asyncAdd(W window, RowData value) { + windowState.setCurrentNamespace(window); + return windowState.asyncAdd(value); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java index 9071f80d3b2..b1fe275087c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -19,8 +19,10 @@ package org.apache.flink.table.runtime.operators.join.window; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -72,13 +74,20 @@ class WindowJoinOperatorTest { private final ZoneId shiftTimeZone; - WindowJoinOperatorTest(ZoneId shiftTimeZone) { + private final boolean enableAsyncState; + + WindowJoinOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) { this.shiftTimeZone = shiftTimeZone; + this.enableAsyncState = enableAsyncState; } - @Parameters(name = "TimeZone = {0}") + @Parameters(name = "TimeZone = {0}, EnableAsyncState = {1}") private static Collection<Object[]> runMode() { - return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + return Arrays.asList( + new Object[] {UTC_ZONE_ID, false}, + new Object[] {UTC_ZONE_ID, true}, + new Object[] {SHANGHAI_ZONE_ID, false}, + new Object[] {SHANGHAI_ZONE_ID, true}); } @TestTemplate @@ -516,7 +525,8 @@ class WindowJoinOperatorTest { HandwrittenSelectorUtil.getRowDataSelector( new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes()); TypeInformation<RowData> keyType = InternalTypeInfo.ofFields(); - WindowJoinOperator operator = + + WindowJoinOperatorBuilder operatorBuilder = WindowJoinOperatorBuilder.builder() .leftSerializer(INPUT_ROW_TYPE.toRowSerializer()) .rightSerializer(INPUT_ROW_TYPE.toRowSerializer()) @@ -525,11 +535,16 @@ class WindowJoinOperatorTest { .rightWindowEndIndex(0) .filterNullKeys(new boolean[] {true}) .joinType(joinType) - .withShiftTimezone(shiftTimeZone) - .build(); - KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, keySelector, keySelector, keyType); - return testHarness; + .withShiftTimezone(shiftTimeZone); + if (enableAsyncState) { + operatorBuilder.enableAsyncState(); + TwoInputStreamOperator<RowData, RowData, RowData> operator = operatorBuilder.build(); + return AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, keySelector, keySelector, keyType, 1, 1, 0); + } else { + TwoInputStreamOperator<RowData, RowData, RowData> operator = operatorBuilder.build(); + return new KeyedTwoInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector, keyType); + } } }