JingsongLi commented on a change in pull request #8058: [FLINK-11959][table-runtime-blink] Introduce window operator for blink streaming runtime URL: https://github.com/apache/flink/pull/8058#discussion_r270275933
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/window/WindowOperator.java ########## @@ -0,0 +1,711 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.window; + +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.window.Window; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedNamespaceAggsHandleFunction; +import org.apache.flink.table.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.generated.NamespaceAggsHandleFunction; +import org.apache.flink.table.generated.RecordEqualiser; +import org.apache.flink.table.runtime.context.ExecutionContextImpl; +import org.apache.flink.table.runtime.window.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.window.assigners.PanedWindowAssigner; +import org.apache.flink.table.runtime.window.assigners.WindowAssigner; +import org.apache.flink.table.runtime.window.internal.GeneralWindowProcessFunction; +import org.apache.flink.table.runtime.window.internal.InternalWindowProcessFunction; +import org.apache.flink.table.runtime.window.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.window.internal.PanedWindowProcessFunction; +import org.apache.flink.table.runtime.window.triggers.Trigger; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.typeutils.BaseRowSerializer; + +import org.apache.commons.lang3.ArrayUtils; + +import java.util.Collection; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An operator that implements the logic for windowing based on a {@link WindowAssigner} and + * {@link Trigger}. + * + * <p>When an element arrives it gets assigned a key using a {@link KeySelector} and it gets + * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element + * is put into panes. A pane is the bucket of elements that have the same key and same + * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by + * the + * {@code WindowAssigner}. + * + * <p>Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when + * the contents of the pane should be processed to emit results. When a trigger fires, + * the given {@link org.apache.flink.table.generated.NamespaceAggsHandleFunction} + * is invoked to produce the results that are emitted for the pane to which the {@code Trigger} + * belongs. + * + * <p>The parameter types: + * {@code <IN>}: BaseRow + * {@code <OUT>}: JoinedRow(KEY, AGG_RESULT) + * {@code <KEY>}: GenericRow + * {@code <AGG_RESULT>}: GenericRow + * {@code <ACC>}: GenericRow + * + * @param <K> The type of key returned by the {@code KeySelector}. + * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. + */ +public class WindowOperator<K, W extends Window> + extends AbstractStreamOperator<BaseRow> + implements OneInputStreamOperator<BaseRow, BaseRow>, Triggerable<K, W> { + + private static final long serialVersionUID = 1L; + + private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped"; + private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate"; + private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; + + // ------------------------------------------------------------------------ + // Configuration values and user functions + // ------------------------------------------------------------------------ + + private final WindowAssigner<W> windowAssigner; + + private final Trigger<W> trigger; + + /** For serializing the window in checkpoints. */ + private final TypeSerializer<W> windowSerializer; + + private final InternalType[] inputFieldTypes; + + private final InternalType[] accumulatorTypes; + + private final InternalType[] aggResultTypes; + + private final InternalType[] windowPropertyTypes; + + private final boolean sendRetraction; + + private final int rowtimeIndex; + + /** + * The allowed lateness for elements. This is used for: + * <ul> + * <li>Deciding if an element should be dropped from a window due to lateness. + * <li>Clearing the state of a window if the system time passes the + * {@code window.maxTimestamp + allowedLateness} landmark. + * </ul> + */ + private final long allowedLateness; + + // -------------------------------------------------------------------------------- + + private NamespaceAggsHandleFunction<W> windowAggregator; + private GeneratedNamespaceAggsHandleFunction generatedWindowAggregator; + + /** + * The util to compare two BaseRow equals to each other. + * As different BaseRow can't be equals directly, we use a code generated util to handle this. + */ + private RecordEqualiser equaliser; + private GeneratedRecordEqualiser generatedEqualiser; + + // -------------------------------------------------------------------------------- + + private transient InternalWindowProcessFunction<K, W> windowFunction; + + /** This is used for emitting elements with a given timestamp. */ + private transient TimestampedCollector<BaseRow> collector; + + /** Flag to prevent duplicate function.close() calls in close() and dispose(). */ + private transient boolean functionsClosed = false; + + private transient InternalTimerService<W> internalTimerService; + + private transient InternalValueState<K, W, BaseRow> windowState; + + private transient InternalValueState<K, W, BaseRow> previousState; + + private transient TriggerContext triggerContext; + + private transient JoinedRow reuseOutput; + + // ------------------------------------------------------------------------ + // Metrics + // ------------------------------------------------------------------------ + + private transient Counter numLateRecordsDropped; + private transient Meter lateRecordsDroppedRate; + private transient Gauge<Long> watermarkLatency; + + WindowOperator( + NamespaceAggsHandleFunction<W> windowAggregator, + RecordEqualiser equaliser, + WindowAssigner<W> windowAssigner, + Trigger<W> trigger, + TypeSerializer<W> windowSerializer, + InternalType[] inputFieldTypes, + InternalType[] accumulatorTypes, + InternalType[] aggResultTypes, + InternalType[] windowPropertyTypes, + int rowtimeIndex, + boolean sendRetraction, + long allowedLateness) { + checkArgument(allowedLateness >= 0); + this.windowAggregator = checkNotNull(windowAggregator); + this.equaliser = checkNotNull(equaliser); + this.windowAssigner = checkNotNull(windowAssigner); + this.trigger = checkNotNull(trigger); + this.windowSerializer = checkNotNull(windowSerializer); + this.inputFieldTypes = checkNotNull(inputFieldTypes); + this.accumulatorTypes = checkNotNull(accumulatorTypes); + this.aggResultTypes = checkNotNull(aggResultTypes); + this.windowPropertyTypes = checkNotNull(windowPropertyTypes); + this.allowedLateness = allowedLateness; + this.sendRetraction = sendRetraction; + + // rowtime index should >= 0 when in event time mode + checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0); + this.rowtimeIndex = rowtimeIndex; + + setChainingStrategy(ChainingStrategy.ALWAYS); + } + + WindowOperator( + GeneratedNamespaceAggsHandleFunction generatedWindowAggregator, + GeneratedRecordEqualiser generatedEqualiser, + WindowAssigner<W> windowAssigner, + Trigger<W> trigger, + TypeSerializer<W> windowSerializer, + InternalType[] inputFieldTypes, + InternalType[] accumulatorTypes, + InternalType[] aggResultTypes, + InternalType[] windowPropertyTypes, + int rowtimeIndex, + boolean sendRetraction, + long allowedLateness) { + checkArgument(allowedLateness >= 0); + this.generatedWindowAggregator = checkNotNull(generatedWindowAggregator); + this.generatedEqualiser = checkNotNull(generatedEqualiser); + this.windowAssigner = checkNotNull(windowAssigner); + this.trigger = checkNotNull(trigger); + this.windowSerializer = checkNotNull(windowSerializer); + this.inputFieldTypes = checkNotNull(inputFieldTypes); + this.accumulatorTypes = checkNotNull(accumulatorTypes); + this.aggResultTypes = checkNotNull(aggResultTypes); + this.windowPropertyTypes = checkNotNull(windowPropertyTypes); + this.allowedLateness = allowedLateness; + this.sendRetraction = sendRetraction; + + // rowtime index should >= 0 when in event time mode + checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0); + this.rowtimeIndex = rowtimeIndex; + + setChainingStrategy(ChainingStrategy.ALWAYS); + } + + @Override + public void open() throws Exception { + super.open(); + + collector = new TimestampedCollector<>(output); + collector.eraseTimestamp(); + + internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); + + triggerContext = new TriggerContext(); + triggerContext.open(); + + StateDescriptor<ValueState<BaseRow>, BaseRow> windowStateDescriptor = new ValueStateDescriptor<>( + "window-aggs", + new BaseRowSerializer(getExecutionConfig(), accumulatorTypes)); + this.windowState = (InternalValueState<K, W, BaseRow>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor); + + if (sendRetraction) { + InternalType[] valueTypes = ArrayUtils.addAll(aggResultTypes, windowPropertyTypes); + StateDescriptor<ValueState<BaseRow>, BaseRow> previousStateDescriptor = new ValueStateDescriptor<>( + "previous-aggs", + new BaseRowSerializer(getExecutionConfig(), valueTypes)); + this.previousState = (InternalValueState<K, W, BaseRow>) getOrCreateKeyedState(windowSerializer, previousStateDescriptor); + } + + // compile aggregator + if (generatedWindowAggregator != null) { + this.windowAggregator = generatedWindowAggregator.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + } + // compile equaliser + if (generatedEqualiser != null) { + this.equaliser = generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + } + + WindowContext windowContext = new WindowContext(); + windowAggregator.open(new ExecutionContextImpl(this, getRuntimeContext())); + + if (windowAssigner instanceof MergingWindowAssigner) { + this.windowFunction = new MergingWindowProcessFunction<>( + (MergingWindowAssigner<W>) windowAssigner, + windowAggregator, + windowSerializer, + allowedLateness); + } else if (windowAssigner instanceof PanedWindowAssigner) { + this.windowFunction = new PanedWindowProcessFunction<>( + (PanedWindowAssigner<W>) windowAssigner, + windowAggregator, + allowedLateness); + } else { + this.windowFunction = new GeneralWindowProcessFunction<>( + windowAssigner, + windowAggregator, + allowedLateness); + } + windowFunction.open(windowContext); + + reuseOutput = new JoinedRow(); + + // metrics + this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME); + this.lateRecordsDroppedRate = metrics.meter( + LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, + new MeterView(numLateRecordsDropped, 60)); + this.watermarkLatency = metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> { + long watermark = internalTimerService.currentWatermark(); + if (watermark < 0) { + return 0L; + } else { + return internalTimerService.currentProcessingTime() - watermark; + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + collector = null; + triggerContext = null; + functionsClosed = true; + windowAggregator.close(); + } + + @Override + public void dispose() throws Exception { + super.dispose(); + collector = null; + triggerContext = null; + if (!functionsClosed) { + functionsClosed = true; + windowAggregator.close(); + } + } + + @Override + public void processElement(StreamRecord<BaseRow> record) throws Exception { + // prepare inputRow and timestamp + GenericRow inputRow = BaseRowUtil.toGenericRow(record.getValue(), inputFieldTypes); Review comment: Why must be GenericRow? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
