This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a22ff7e961cb5d8b52401e567e59954bff35d50 Author: xuyang <[email protected]> AuthorDate: Fri Jan 19 17:48:46 2024 +0800 [FLINK-34100][table] Introduce UnalignedWindowTableFunctionOperator for unaligned window This closes #24162 --- .../processors/UnsliceWindowAggProcessor.java | 10 +- .../internal/MergingWindowProcessFunction.java | 84 ++- .../groupwindow/operator/WindowOperator.java | 16 +- .../AlignedWindowTableFunctionOperator.java | 1 + .../UnalignedWindowTableFunctionOperator.java | 606 +++++++++++++++++++++ .../operator/WindowTableFunctionOperatorBase.java | 20 + .../AlignedWindowTableFunctionOperatorTest.java | 79 +-- .../UnalignedWindowTableFunctionOperatorTest.java | 336 ++++++++++++ .../WindowTableFunctionOperatorTestBase.java | 102 ++++ 9 files changed, 1175 insertions(+), 79 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java index 33be17388db..00eda35222e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java @@ -31,7 +31,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.RowDataUtil; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.operators.window.TimeWindow; -import org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction; import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; @@ -41,6 +40,7 @@ import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssi import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiConsumerWithException; import java.time.ZoneId; import java.util.Collection; @@ -198,7 +198,7 @@ public class UnsliceWindowAggProcessor extends AbstractWindowAggProcessor<TimeWi } private class WindowContextImpl - implements InternalWindowProcessFunction.Context<RowData, TimeWindow> { + implements MergingWindowProcessFunction.MergingContext<RowData, TimeWindow> { @Override public long currentProcessingTime() { @@ -277,6 +277,12 @@ public class UnsliceWindowAggProcessor extends AbstractWindowAggProcessor<TimeWi public RowData currentKey() { return ctx.getKeyedStateBackend().getCurrentKey(); } + + @Override + public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> + getWindowStateMergingConsumer() { + return new MergingWindowProcessFunction.DefaultAccMergingConsumer<>(this, aggregator); + } } private class TriggerContextImpl implements Trigger.OnMergeContext { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java index 3ac9dd8f725..b89041d38e5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java @@ -25,12 +25,14 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; import org.apache.flink.table.runtime.operators.window.Window; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.util.function.BiConsumerWithException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkArgument; /** * The implementation of {@link InternalWindowProcessFunction} for {@link MergingWindowAssigner}. @@ -61,13 +63,15 @@ public class MergingWindowProcessFunction<K, W extends Window> @Override public void open(Context<K, W> ctx) throws Exception { + checkArgument(ctx instanceof MergingContext); super.open(ctx); MapStateDescriptor<W, W> mappingStateDescriptor = new MapStateDescriptor<>( "session-window-mapping", windowSerializer, windowSerializer); MapState<W, W> windowMapping = ctx.getPartitionedState(mappingStateDescriptor); this.mergingWindows = new MergingWindowSet<>(windowAssigner, windowMapping); - this.mergingFunction = new MergingFunctionImpl(); + this.mergingFunction = + new MergingFunctionImpl(((MergingContext) ctx).getWindowStateMergingConsumer()); } @Override @@ -124,8 +128,20 @@ public class MergingWindowProcessFunction<K, W extends Window> } } + /** Get the state window as the namespace stored acc in the data about this actual window. */ + public W getStateWindow(W window) throws Exception { + return mergingWindows.getStateWindow(window); + } + private class MergingFunctionImpl implements MergingWindowSet.MergeFunction<W> { + private final BiConsumerWithException<W, Collection<W>, Throwable> accMergingConsumer; + + public MergingFunctionImpl( + BiConsumerWithException<W, Collection<W>, Throwable> accMergingConsumer) { + this.accMergingConsumer = accMergingConsumer; + } + @Override public void merge( W mergeResult, @@ -166,23 +182,61 @@ public class MergingWindowProcessFunction<K, W extends Window> // merge the merged state windows into the newly resulting state window if (!stateWindowsToBeMerged.isEmpty()) { - RowData targetAcc = ctx.getWindowAccumulators(stateWindowResult); - if (targetAcc == null) { - targetAcc = windowAggregator.createAccumulators(); + try { + accMergingConsumer.accept(stateWindowResult, stateWindowsToBeMerged); + } catch (Throwable e) { + throw new RuntimeException("Should not happen", e); } - windowAggregator.setAccumulators(stateWindowResult, targetAcc); - for (W w : stateWindowsToBeMerged) { - RowData acc = ctx.getWindowAccumulators(w); - if (acc != null) { - windowAggregator.merge(w, acc); - } - // clear merged window - ctx.clearWindowState(w); - ctx.clearPreviousState(w); + } + } + } + + /** + * A default merging consumer that merges the accumulators in state windows that waiting to be + * merged into the target state window. + * + * <p>The first parameter is the target state window, the second parameter is the windows that + * need to be merged. + */ + public static class DefaultAccMergingConsumer<W extends Window> + implements BiConsumerWithException<W, Collection<W>, Throwable> { + + private final Context<?, W> ctx; + + private final NamespaceAggsHandleFunctionBase<W> windowAggregator; + + public DefaultAccMergingConsumer( + Context<?, W> ctx, NamespaceAggsHandleFunctionBase<W> windowAggregator) { + this.ctx = ctx; + this.windowAggregator = windowAggregator; + } + + @Override + public void accept(W stateWindowResult, Collection<W> stateWindowsToBeMerged) + throws Throwable { + RowData targetAcc = ctx.getWindowAccumulators(stateWindowResult); + if (targetAcc == null) { + targetAcc = windowAggregator.createAccumulators(); + } + windowAggregator.setAccumulators(stateWindowResult, targetAcc); + for (W w : stateWindowsToBeMerged) { + RowData acc = ctx.getWindowAccumulators(w); + if (acc != null) { + windowAggregator.merge(w, acc); } - targetAcc = windowAggregator.getAccumulators(); - ctx.setWindowAccumulators(stateWindowResult, targetAcc); + // clear merged window + ctx.clearWindowState(w); + ctx.clearPreviousState(w); } + targetAcc = windowAggregator.getAccumulators(); + ctx.setWindowAccumulators(stateWindowResult, targetAcc); } } + + /** A {@link Context} used for {@link MergingWindowProcessFunction}. */ + public interface MergingContext<K, W extends Window> extends Context<K, W> { + + /** Returns the consumer used to merge window state. */ + BiConsumerWithException<W, Collection<W>, Throwable> getWindowStateMergingConsumer(); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java index 68f6f7c6c3e..8e7c26c67d4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java @@ -57,6 +57,7 @@ import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trig import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; import org.apache.commons.lang3.ArrayUtils; @@ -274,12 +275,13 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream compileGeneratedCode(); - WindowContext windowContext = new WindowContext(); windowAggregator.open( new PerWindowStateDataViewStore( getKeyedStateBackend(), windowSerializer, getRuntimeContext())); + WindowContext windowContext; if (windowAssigner instanceof MergingWindowAssigner) { + windowContext = new MergingWindowContext(); this.windowFunction = new MergingWindowProcessFunction<>( (MergingWindowAssigner<W>) windowAssigner, @@ -287,12 +289,14 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream windowSerializer, allowedLateness); } else if (windowAssigner instanceof PanedWindowAssigner) { + windowContext = new WindowContext(); this.windowFunction = new PanedWindowProcessFunction<>( (PanedWindowAssigner<W>) windowAssigner, windowAggregator, allowedLateness); } else { + windowContext = new WindowContext(); this.windowFunction = new GeneralWindowProcessFunction<>( windowAssigner, windowAggregator, allowedLateness); @@ -541,6 +545,16 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream } } + private class MergingWindowContext extends WindowContext + implements MergingWindowProcessFunction.MergingContext<K, W> { + @Override + public BiConsumerWithException<W, Collection<W>, Throwable> + getWindowStateMergingConsumer() { + return new MergingWindowProcessFunction.DefaultAccMergingConsumer<>( + this, windowAggregator); + } + } + /** * {@code TriggerContext} is a utility for handling {@code Trigger} invocations. It can be * reused by setting the {@code key} and {@code window} fields. No internal state must be kept diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java index deb2c59200c..17c5a4b5125 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java @@ -57,6 +57,7 @@ public class AlignedWindowTableFunctionOperator extends WindowTableFunctionOpera if (windowAssigner.isEventTime()) { if (inputRow.isNullAt(rowtimeIndex)) { // null timestamp would be dropped + numNullRowTimeRecordsDropped.inc(); return; } timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java new file mode 100644 index 00000000000..6f93df0af78 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * <p>See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * <p>Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * <p>This operator emits result at the end of window instead of per record. + * + * <p>This operator will not compact changelog records. + * + * <p>This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase + implements Triggerable<RowData, TimeWindow> { + + private static final long serialVersionUID = 1L; + + private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped"; + private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + + private final Trigger<TimeWindow> trigger; + + private final TypeSerializer<RowData> inputSerializer; + + private final TypeSerializer<TimeWindow> windowSerializer; + + private transient InternalTimerService<TimeWindow> internalTimerService; + + // a counter to tag the order of all input streams when entering the operator + private transient ValueState<Long> counterState; + + private transient InternalMapState<RowData, TimeWindow, Long, RowData> windowState; + + private transient TriggerContextImpl triggerContext; + + private transient MergingWindowProcessFunction<RowData, TimeWindow> windowFunction; + + private transient NamespaceAggsHandleFunctionBase<TimeWindow> windowAggregator; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private transient Counter numLateRecordsDropped; + private transient Meter lateRecordsDroppedRate; + private transient Gauge<Long> watermarkLatency; + + public UnalignedWindowTableFunctionOperator( + GroupWindowAssigner<TimeWindow> windowAssigner, + TypeSerializer<TimeWindow> windowSerializer, + TypeSerializer<RowData> inputSerializer, + int rowtimeIndex, + ZoneId shiftTimeZone) { + super(windowAssigner, rowtimeIndex, shiftTimeZone); + this.trigger = createTrigger(windowAssigner); + this.windowSerializer = checkNotNull(windowSerializer); + this.inputSerializer = checkNotNull(inputSerializer); + } + + @Override + public void open() throws Exception { + super.open(); + + internalTimerService = + getInternalTimerService("session-window-tvf-timers", windowSerializer, this); + + triggerContext = new TriggerContextImpl(); + triggerContext.open(); + + ValueStateDescriptor<Long> counterStateDescriptor = + new ValueStateDescriptor<>("session-window-tvf-counter", LongSerializer.INSTANCE); + counterState = getRuntimeContext().getState(counterStateDescriptor); + + MapStateDescriptor<Long, RowData> windowStateDescriptor = + new MapStateDescriptor<>( + "session-window-tvf-acc", LongSerializer.INSTANCE, inputSerializer); + + windowState = + (InternalMapState<RowData, TimeWindow, Long, RowData>) + getOrCreateKeyedState(windowSerializer, windowStateDescriptor); + + windowAggregator = new DummyWindowAggregator(); + windowAggregator.open( + new PerWindowStateDataViewStore( + getKeyedStateBackend(), windowSerializer, getRuntimeContext())); + + WindowContextImpl windowContext = new WindowContextImpl(); + + windowFunction = + new MergingWindowProcessFunction<>( + (MergingWindowAssigner<TimeWindow>) windowAssigner, + windowAggregator, + windowSerializer, + 0); + + windowFunction.open(windowContext); + + this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.lateRecordsDroppedRate = + metrics.meter( + LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(numLateRecordsDropped)); + this.watermarkLatency = + metrics.gauge( + WATERMARK_LATENCY_METRIC_NAME, + () -> { + long watermark = internalTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return internalTimerService.currentProcessingTime() - watermark; + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + if (windowAggregator != null) { + windowAggregator.close(); + } + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + RowData inputRow = element.getValue(); + + long timestamp; + if (windowAssigner.isEventTime()) { + if (inputRow.isNullAt(rowtimeIndex)) { + // null timestamp would be dropped + numNullRowTimeRecordsDropped.inc(); + return; + } + timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); + } else { + timestamp = getProcessingTimeService().getCurrentProcessingTime(); + } + + // no matter if order exceeds the Long.MAX_VALUE + Long order = counterState.value(); + if (null == order) { + order = 0L; + } + counterState.update(order + 1); + + timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone); + // the windows which the input row should be placed into + Collection<TimeWindow> affectedWindows = + windowFunction.assignStateNamespace(inputRow, timestamp); + boolean isElementDropped = true; + for (TimeWindow window : affectedWindows) { + isElementDropped = false; + windowState.setCurrentNamespace(window); + windowState.put(order, inputRow); + } + + // the actual window which the input row is belongs to + Collection<TimeWindow> actualWindows = + windowFunction.assignActualWindows(inputRow, timestamp); + Preconditions.checkArgument( + (affectedWindows.isEmpty() && actualWindows.isEmpty()) + || (!affectedWindows.isEmpty() && !actualWindows.isEmpty())); + for (TimeWindow window : actualWindows) { + triggerContext.setWindow(window); + boolean triggerResult = triggerContext.onElement(inputRow, timestamp); + if (triggerResult) { + emitWindowResult(window); + } + // clear up state + registerCleanupTimer(window); + } + if (isElementDropped) { + // markEvent will increase numLateRecordsDropped + lateRecordsDroppedRate.markEvent(); + } + } + + private void registerCleanupTimer(TimeWindow window) { + long cleanupTime = getCleanupTime(window); + if (cleanupTime == Long.MAX_VALUE) { + // no need to clean up because we didn't set one + return; + } + if (windowAssigner.isEventTime()) { + triggerContext.registerEventTimeTimer(cleanupTime); + } else { + triggerContext.registerProcessingTimeTimer(cleanupTime); + } + } + + private void emitWindowResult(TimeWindow window) throws Exception { + TimeWindow stateWindow = windowFunction.getStateWindow(window); + windowState.setCurrentNamespace(stateWindow); + Iterator<Map.Entry<Long, RowData>> iterator = windowState.iterator(); + // build a sorted map + TreeMap<Long, RowData> sortedMap = new TreeMap<>(); + while (iterator.hasNext()) { + Map.Entry<Long, RowData> entry = iterator.next(); + sortedMap.put(entry.getKey(), entry.getValue()); + } + // emit the sorted map + for (Map.Entry<Long, RowData> entry : sortedMap.entrySet()) { + collect(entry.getValue(), Collections.singletonList(window)); + } + } + + @Override + public void onEventTime(InternalTimer<RowData, TimeWindow> timer) throws Exception { + triggerContext.setWindow(timer.getNamespace()); + if (triggerContext.onEventTime(timer.getTimestamp())) { + // fire + emitWindowResult(triggerContext.window); + } + + if (windowAssigner.isEventTime()) { + windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp()); + } + } + + @Override + public void onProcessingTime(InternalTimer<RowData, TimeWindow> timer) throws Exception { + triggerContext.setWindow(timer.getNamespace()); + if (triggerContext.onProcessingTime(timer.getTimestamp())) { + // fire + emitWindowResult(triggerContext.window); + } + + if (!windowAssigner.isEventTime()) { + windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp()); + } + } + + /** + * In case this leads to a value greated than {@link Long#MAX_VALUE} then a cleanup time of + * {@link Long#MAX_VALUE} is returned. + */ + private long getCleanupTime(TimeWindow window) { + // In case this leads to a value greater than Long.MAX_VALUE, then a cleanup + // time of Long.MAX_VALUE is returned. + long cleanupTime = Math.max(0, window.maxTimestamp()); + cleanupTime = cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; + return toEpochMillsForTimer(cleanupTime, shiftTimeZone); + } + + private static Trigger<TimeWindow> createTrigger( + GroupWindowAssigner<TimeWindow> windowAssigner) { + if (windowAssigner.isEventTime()) { + return EventTimeTriggers.afterEndOfWindow(); + } else { + return ProcessingTimeTriggers.afterEndOfWindow(); + } + } + + private class WindowContextImpl + implements MergingWindowProcessFunction.MergingContext<RowData, TimeWindow> { + + @Override + public void deleteCleanupTimer(TimeWindow window) throws Exception { + long cleanupTime = UnalignedWindowTableFunctionOperator.this.getCleanupTime(window); + if (cleanupTime == Long.MAX_VALUE) { + // no need to clean up because we didn't set one + return; + } + if (windowAssigner.isEventTime()) { + triggerContext.deleteEventTimeTimer(cleanupTime); + } else { + triggerContext.deleteProcessingTimeTimer(cleanupTime); + } + } + + @Override + public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) + throws Exception { + requireNonNull(stateDescriptor, "The state properties must not be null"); + return UnalignedWindowTableFunctionOperator.this.getPartitionedState(stateDescriptor); + } + + @Override + public RowData currentKey() { + return (RowData) UnalignedWindowTableFunctionOperator.this.getCurrentKey(); + } + + @Override + public long currentProcessingTime() { + return internalTimerService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return internalTimerService.currentWatermark(); + } + + @Override + public ZoneId getShiftTimeZone() { + return shiftTimeZone; + } + + @Override + public void clearWindowState(TimeWindow window) throws Exception { + windowState.setCurrentNamespace(window); + windowState.clear(); + } + + @Override + public void clearTrigger(TimeWindow window) throws Exception { + triggerContext.setWindow(window); + triggerContext.clear(); + } + + @Override + public void onMerge(TimeWindow newWindow, Collection<TimeWindow> mergedWindows) + throws Exception { + triggerContext.setWindow(newWindow); + triggerContext.setMergedWindows(mergedWindows); + triggerContext.onMerge(); + } + + @Override + public void clearPreviousState(TimeWindow window) throws Exception {} + + @Override + public RowData getWindowAccumulators(TimeWindow window) throws Exception { + return null; + } + + @Override + public void setWindowAccumulators(TimeWindow window, RowData acc) throws Exception {} + + @Override + public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> + getWindowStateMergingConsumer() { + return new MergingConsumer(windowState); + } + } + + private class TriggerContextImpl implements Trigger.OnMergeContext { + + private TimeWindow window; + private Collection<TimeWindow> mergedWindows; + + public void open() throws Exception { + trigger.open(this); + } + + public boolean onElement(RowData row, long timestamp) throws Exception { + return trigger.onElement(row, timestamp, window); + } + + public boolean onProcessingTime(long time) throws Exception { + return trigger.onProcessingTime(time, window); + } + + public boolean onEventTime(long time) throws Exception { + return trigger.onEventTime(time, window); + } + + public void onMerge() throws Exception { + trigger.onMerge(window, this); + } + + public void setWindow(TimeWindow window) { + this.window = window; + } + + public void setMergedWindows(Collection<TimeWindow> mergedWindows) { + this.mergedWindows = mergedWindows; + } + + @Override + public long getCurrentProcessingTime() { + return internalTimerService.currentProcessingTime(); + } + + @Override + public long getCurrentWatermark() { + return internalTimerService.currentWatermark(); + } + + @Override + public void registerProcessingTimeTimer(long time) { + internalTimerService.registerProcessingTimeTimer(window, time); + } + + @Override + public void registerEventTimeTimer(long time) { + internalTimerService.registerEventTimeTimer(window, time); + } + + @Override + public void deleteProcessingTimeTimer(long time) { + internalTimerService.deleteProcessingTimeTimer(window, time); + } + + @Override + public void deleteEventTimeTimer(long time) { + internalTimerService.deleteEventTimeTimer(window, time); + } + + @Override + public ZoneId getShiftTimeZone() { + return shiftTimeZone; + } + + public void clear() throws Exception { + trigger.clear(window); + } + + @Override + public <S extends MergingState<?, ?>> void mergePartitionedState( + StateDescriptor<S, ?> stateDescriptor) { + if (mergedWindows != null && !mergedWindows.isEmpty()) { + try { + State state = + UnalignedWindowTableFunctionOperator.this.getOrCreateKeyedState( + windowSerializer, stateDescriptor); + if (state instanceof InternalMergingState) { + ((InternalMergingState<RowData, TimeWindow, ?, ?, ?>) state) + .mergeNamespaces(window, mergedWindows); + } else { + throw new IllegalArgumentException( + "The given state descriptor does not refer to a mergeable state (MergingState)"); + } + } catch (Exception e) { + throw new RuntimeException("Error while merging state.", e); + } + } + } + + @Override + public MetricGroup getMetricGroup() { + return UnalignedWindowTableFunctionOperator.this.getMetricGroup(); + } + + @Override + public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) { + try { + return UnalignedWindowTableFunctionOperator.this.getPartitionedState( + window, windowSerializer, stateDescriptor); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + private static class MergingConsumer + implements BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> { + + private final InternalMapState<RowData, TimeWindow, Long, RowData> windowState; + + public MergingConsumer(InternalMapState<RowData, TimeWindow, Long, RowData> windowState) { + this.windowState = windowState; + } + + @Override + public void accept( + TimeWindow stateWindowResult, Collection<TimeWindow> stateWindowsToBeMerged) + throws Throwable { + for (TimeWindow mergedWindow : stateWindowsToBeMerged) { + windowState.setCurrentNamespace(mergedWindow); + Iterator<Map.Entry<Long, RowData>> iterator = windowState.iterator(); + windowState.setCurrentNamespace(stateWindowResult); + while (iterator.hasNext()) { + Map.Entry<Long, RowData> entry = iterator.next(); + windowState.put(entry.getKey(), entry.getValue()); + } + } + } + } + + /** + * A dummy window aggregator is used to reuse the same logic in legacy group session window + * operator. + * + * <p>Instead of using window aggregator, we use a custom {@link MergingConsumer} to merge the + * accumulators in state. + */ + private static class DummyWindowAggregator + implements NamespaceAggsHandleFunctionBase<TimeWindow> { + + private final IllegalStateException thrown = + new IllegalStateException( + "The function should not be called in DummyWindowAggregator"); + + @Override + public void open(StateDataViewStore store) throws Exception {} + + @Override + public void setAccumulators(TimeWindow namespace, RowData accumulators) throws Exception { + throw thrown; + } + + @Override + public void accumulate(RowData inputRow) throws Exception { + throw thrown; + } + + @Override + public void retract(RowData inputRow) throws Exception { + throw thrown; + } + + @Override + public void merge(TimeWindow namespace, RowData otherAcc) throws Exception { + throw thrown; + } + + @Override + public RowData createAccumulators() throws Exception { + throw thrown; + } + + @Override + public RowData getAccumulators() throws Exception { + throw thrown; + } + + @Override + public void cleanup(TimeWindow namespace) throws Exception { + throw thrown; + } + + @Override + public void close() throws Exception {} + } + + @VisibleForTesting + public Counter getNumLateRecordsDropped() { + return numLateRecordsDropped; + } + + @VisibleForTesting + public Gauge<Long> getWatermarkLatency() { + return watermarkLatency; + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java index d2494267516..9c60ac9af58 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java @@ -18,6 +18,8 @@ package org.apache.flink.table.runtime.operators.window.tvf.operator; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; @@ -43,6 +45,9 @@ import static org.apache.flink.util.Preconditions.checkArgument; public abstract class WindowTableFunctionOperatorBase extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> { + private static final String NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME = + "numNullRowTimeRecordsDropped"; + /** * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC @@ -60,6 +65,12 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato private transient JoinedRowData outRow; private transient GenericRowData windowProperties; + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + protected transient Counter numNullRowTimeRecordsDropped; + public WindowTableFunctionOperatorBase( GroupWindowAssigner<TimeWindow> windowAssigner, int rowtimeIndex, @@ -80,6 +91,10 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato outRow = new JoinedRowData(); windowProperties = new GenericRowData(3); + + // metrics + this.numNullRowTimeRecordsDropped = + metrics.counter(NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME); } @Override @@ -101,4 +116,9 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato collector.collect(outRow.replace(inputRow, windowProperties)); } } + + @VisibleForTesting + public Counter getNumNullRowTimeRecordsDropped() { + return numNullRowTimeRecordsDropped; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java index bcf511c9408..b22c9a3b1b7 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java @@ -18,25 +18,14 @@ package org.apache.flink.table.runtime.operators.window.tvf.operator; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.CumulativeWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer; -import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; -import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.VarCharType; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,19 +37,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; -import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link AlignedWindowTableFunctionOperator}. */ @RunWith(Parameterized.class) -public class AlignedWindowTableFunctionOperatorTest { - - private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); - private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); - private final ZoneId shiftTimeZone; +public class AlignedWindowTableFunctionOperatorTest extends WindowTableFunctionOperatorTestBase { public AlignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) { - this.shiftTimeZone = shiftTimeZone; + super(shiftTimeZone); } @Parameterized.Parameters(name = "TimeZone = {0}") @@ -68,33 +53,6 @@ public class AlignedWindowTableFunctionOperatorTest { return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); } - private static final RowType INPUT_ROW_TYPE = - new RowType( - Arrays.asList( - new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), - new RowType.RowField("f1", new IntType()), - new RowType.RowField("f2", new TimestampType(3)))); - - private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); - private static final int ROW_TIME_INDEX = 2; - - private static final LogicalType[] OUTPUT_TYPES = - new LogicalType[] { - new VarCharType(Integer.MAX_VALUE), - new IntType(), - new TimestampType(3), - new TimestampType(3), - new TimestampType(3), - new TimestampType(3) - }; - - private static final TypeSerializer<RowData> OUT_SERIALIZER = - new RowDataSerializer(OUTPUT_TYPES); - - private static final RowDataHarnessAssertor ASSERTER = - new RowDataHarnessAssertor( - OUTPUT_TYPES, new GenericRowRecordSortComparator(4, new TimestampType())); - @Test public void testTumblingWindows() throws Exception { final TumblingWindowAssigner assigner = TumblingWindowAssigner.of(Duration.ofSeconds(3)); @@ -119,9 +77,18 @@ public class AlignedWindowTableFunctionOperatorTest { // late element would not be dropped testHarness.processElement(insertRecord("key2", 1, 80L)); + // rowtime is null, should be dropped + testHarness.processElement(insertRecord("key2", 1, ((Long) null))); expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), localMills(3000L), 2999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + + assertThat( + ((AlignedWindowTableFunctionOperator) testHarness.getOperator()) + .getNumNullRowTimeRecordsDropped() + .getCount()) + .isEqualTo(1); + testHarness.close(); } @Test @@ -148,6 +115,7 @@ public class AlignedWindowTableFunctionOperatorTest { "key2", 1, Long.MAX_VALUE, localMills(3000L), localMills(6000L), 5999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); } @Test @@ -189,6 +157,7 @@ public class AlignedWindowTableFunctionOperatorTest { expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), localMills(3000L), 2999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); } @Test @@ -228,6 +197,7 @@ public class AlignedWindowTableFunctionOperatorTest { "key2", 1, Long.MAX_VALUE, localMills(3000L), localMills(6000L), 5999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); } @Test @@ -265,6 +235,7 @@ public class AlignedWindowTableFunctionOperatorTest { expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), localMills(3000L), 2999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); } @Test @@ -302,6 +273,7 @@ public class AlignedWindowTableFunctionOperatorTest { "key2", 1, Long.MAX_VALUE, localMills(3000L), localMills(6000L), 5999L)); ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); } private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( @@ -311,19 +283,4 @@ public class AlignedWindowTableFunctionOperatorTest { windowAssigner, ROW_TIME_INDEX, shiftTimeZone); return new OneInputStreamOperatorTestHarness<>(operator, INPUT_ROW_SER); } - - private StreamRecord<RowData> insertRecord(String f0, int f1, Long... f2) { - Object[] fields = new Object[2 + f2.length]; - fields[0] = f0; - fields[1] = f1; - for (int idx = 0; idx < f2.length; idx++) { - fields[2 + idx] = TimestampData.fromEpochMillis(f2[idx]); - } - return new StreamRecord<>(row(fields)); - } - - /** Get the timestamp in mills by given epoch mills and timezone. */ - private long localMills(long epochMills) { - return toUtcTimestampMills(epochMills, shiftTimeZone); - } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java new file mode 100644 index 00000000000..3bc2df1bea2 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link UnalignedWindowTableFunctionOperator}. */ +@RunWith(Parameterized.class) +public class UnalignedWindowTableFunctionOperatorTest extends WindowTableFunctionOperatorTestBase { + + public UnalignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) { + super(shiftTimeZone); + } + + @Test + public void testEventTimeSessionWindows() throws Exception { + final SessionWindowAssigner assigner = SessionWindowAssigner.withGap(Duration.ofSeconds(3)); + UnalignedWindowTableFunctionOperator operator = createOperator(assigner, ROW_TIME_INDEX); + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(insertRecord("key1", 1, 20L)); + testHarness.processElement(insertRecord("key2", 1, 3999L)); + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new Watermark(2999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(3999)); + // append 3 fields: window_start, window_end, window_time + expectedOutput.add(insertRecord("key1", 1, 20L, localMills(20L), localMills(3020L), 3019L)); + expectedOutput.add(new Watermark(3999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // the window end is 5000L, so it's not a late record + testHarness.processElement(insertRecord("key1", 1, 2000L)); + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add( + insertRecord("key1", 1, 2000L, localMills(2000L), localMills(5000L), 4999L)); + expectedOutput.add(new Watermark(4999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // test out-of-order records + testHarness.processElement(insertRecord("key2", 2, 7999L)); + testHarness.processElement(insertRecord("key2", 3, 5999L)); + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new Watermark(5999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(6999)); + expectedOutput.add(new Watermark(6999)); + testHarness.processWatermark(new Watermark(7999)); + expectedOutput.add(new Watermark(7999)); + testHarness.processWatermark(new Watermark(8999)); + expectedOutput.add(new Watermark(8999)); + testHarness.processWatermark(new Watermark(9999)); + expectedOutput.add(new Watermark(9999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processWatermark(new Watermark(10999)); + + // late record should be dropped + testHarness.processElement(insertRecord("key1", 1, 999L)); + + // rowtime is null, should be dropped + testHarness.processElement(insertRecord("key1", 1, ((Long) null))); + + expectedOutput.add( + insertRecord("key2", 1, 3999L, localMills(3999L), localMills(10999L), 10998L)); + expectedOutput.add( + insertRecord("key2", 2, 7999L, localMills(3999L), localMills(10999L), 10998L)); + expectedOutput.add( + insertRecord("key2", 3, 5999L, localMills(3999L), localMills(10999L), 10998L)); + expectedOutput.add(new Watermark(10999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + assertThat(operator.getNumLateRecordsDropped().getCount()).isEqualTo(1); + assertThat(operator.getNumNullRowTimeRecordsDropped().getCount()).isEqualTo(1); + + testHarness.close(); + } + + @Test + public void testProcessTimeSessionWindows() throws Exception { + final SessionWindowAssigner assigner = + SessionWindowAssigner.withGap(Duration.ofSeconds(3)).withProcessingTime(); + UnalignedWindowTableFunctionOperator operator = createOperator(assigner, -1); + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + // timestamp is ignored in processing time + testHarness.setProcessingTime(20L); + testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(1999); + testHarness.setProcessingTime(2999); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(3999L); + testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); + + // append 3 fields: window_start, window_end, window_time + expectedOutput.add( + insertRecord("key1", 1, Long.MAX_VALUE, localMills(20L), localMills(3020L), 3019L)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(4999); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // test records with second field out of order + testHarness.setProcessingTime(5999); + testHarness.processElement(insertRecord("key2", 3, Long.MAX_VALUE)); + + testHarness.setProcessingTime(7999); + testHarness.processElement(insertRecord("key2", 2, Long.MAX_VALUE)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(6999); + testHarness.setProcessingTime(7999); + testHarness.setProcessingTime(8999); + testHarness.setProcessingTime(9999); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(10999); + + expectedOutput.add( + insertRecord( + "key2", 1, Long.MAX_VALUE, localMills(3999L), localMills(10999L), 10998L)); + expectedOutput.add( + insertRecord( + "key2", 3, Long.MAX_VALUE, localMills(3999L), localMills(10999L), 10998L)); + expectedOutput.add( + insertRecord( + "key2", 2, Long.MAX_VALUE, localMills(3999L), localMills(10999L), 10998L)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + assertThat(operator.getWatermarkLatency().getValue()).isEqualTo(Long.valueOf(0L)); + + testHarness.close(); + } + + @Test + public void testSessionWindowsWithoutPartitionKeys() throws Exception { + final SessionWindowAssigner assigner = SessionWindowAssigner.withGap(Duration.ofSeconds(3)); + UnalignedWindowTableFunctionOperator operator = createOperator(assigner, ROW_TIME_INDEX); + + final EmptyRowDataKeySelector keySelector = EmptyRowDataKeySelector.INSTANCE; + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector.getProducedType()); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(insertRecord("key1", 1, 1999L)); + testHarness.processElement(insertRecord("key2", 1, 3999L)); + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + testHarness.processWatermark(new Watermark(3999)); + expectedOutput.add(new Watermark(3999)); + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new Watermark(5999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector.getProducedType()); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processWatermark(new Watermark(6999)); + + // append 3 fields: window_start, window_end, window_time + expectedOutput.add( + insertRecord("key1", 1, 1999L, localMills(1999L), localMills(6999L), 6998L)); + expectedOutput.add( + insertRecord("key2", 1, 3999L, localMills(1999L), localMills(6999L), 6998L)); + expectedOutput.add(new Watermark(6999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + protected static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0])); + + private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( + UnalignedWindowTableFunctionOperator operator) throws Exception { + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); + } + + private UnalignedWindowTableFunctionOperator createOperator( + GroupWindowAssigner<TimeWindow> windowAssigner, int rowTimeIndex) { + return new UnalignedWindowTableFunctionOperator( + windowAssigner, + windowAssigner.getWindowSerializer(new ExecutionConfig()), + new RowDataSerializer(INPUT_ROW_TYPE), + rowTimeIndex, + shiftTimeZone); + } + + @Parameterized.Parameters(name = "TimeZone = {0}") + public static Collection<Object[]> runMode() { + return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID}); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java new file mode 100644 index 00000000000..fe25b1025dd --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.time.ZoneId; +import java.util.Arrays; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; + +/** A test base to test window table function operator . */ +public abstract class WindowTableFunctionOperatorTestBase { + + protected static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + protected static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); + protected final ZoneId shiftTimeZone; + + public WindowTableFunctionOperatorTestBase(ZoneId shiftTimeZone) { + this.shiftTimeZone = shiftTimeZone; + } + + /** Get the timestamp in mills by given epoch mills and timezone. */ + protected long localMills(long epochMills) { + return toUtcTimestampMills(epochMills, shiftTimeZone); + } + + // ============================== Utils ============================== + + // ============================== Util Fields ============================== + + protected static final RowType INPUT_ROW_TYPE = + new RowType( + Arrays.asList( + new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), + new RowType.RowField("f1", new IntType()), + new RowType.RowField("f2", new TimestampType(3)))); + + protected static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); + protected static final int ROW_TIME_INDEX = 2; + + protected static final LogicalType[] OUTPUT_TYPES = + new LogicalType[] { + new VarCharType(Integer.MAX_VALUE), + new IntType(), + new TimestampType(3), + new TimestampType(3), + new TimestampType(3), + new TimestampType(3) + }; + + protected static final TypeSerializer<RowData> OUT_SERIALIZER = + new RowDataSerializer(OUTPUT_TYPES); + + protected static final RowDataHarnessAssertor ASSERTER = + new RowDataHarnessAssertor( + OUTPUT_TYPES, new GenericRowRecordSortComparator(4, new TimestampType())); + + // ============================== Util Functions ============================== + + protected static StreamRecord<RowData> insertRecord(String f0, int f1, Long... f2) { + Object[] fields = new Object[2 + f2.length]; + fields[0] = f0; + fields[1] = f1; + for (int idx = 0; idx < f2.length; idx++) { + if (f2[idx] == null) { + fields[2 + idx] = null; + } else { + fields[2 + idx] = TimestampData.fromEpochMillis(f2[idx]); + } + } + return new StreamRecord<>(row(fields)); + } +}
