lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1465992611
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java:
##########
@@ -103,4 +111,67 @@ protected Transformation<RowData> translateToPlanInternal(
inputTransform.getParallelism(),
false);
}
+
+ private Transformation<RowData> translateWithUnalignedWindow(
+ PlannerBase planner,
+ ExecNodeConfig config,
+ RowType inputRowType,
+ Transformation<RowData> inputTransform) {
+ final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+ createUnalignedWindowTableFunctionOperator(config,
inputRowType);
+ final OneInputTransformation<RowData, RowData> transform =
+ ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationMeta(WINDOW_TRANSFORMATION,
config),
+ windowTableFunctionOperator,
+ InternalTypeInfo.of(getOutputType()),
+ inputTransform.getParallelism(),
+ false);
+
+ final int[] partitionKeys =
extractPartitionKeys(windowingStrategy.getWindow());
Review Comment:
If user doesn't specify the partition key in session tvf, do we need to set
the state KeySelector? The session window tvf operator is stateful, I want to
say is that if user doesn't specify the key, the session operator is singleton,
so what is the state key? Moreover, batch mode doesn't need to set KeySelector.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * <p>See more details about aligned window and unaligned window in {@link
+ *
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * <p>Note: The operator only applies for Window TVF with set semantics (e.g
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * <p>This operator emits result at the end of window instead of per record.
+ *
+ * <p>This operator will not compact changelog records.
+ *
+ * <p>This operator will keep the original order of input records when
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends
WindowTableFunctionOperatorBase
+ implements Triggerable<RowData, TimeWindow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME =
+ "numNullRowTimeRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME =
"numLateRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
"lateRecordsDroppedRate";
+ private static final String WATERMARK_LATENCY_METRIC_NAME =
"watermarkLatency";
+
+ private final Trigger<TimeWindow> trigger;
+
+ private final LogicalType[] inputFieldTypes;
+
+ private final TypeSerializer<TimeWindow> windowSerializer;
+
+ private transient InternalTimerService<TimeWindow> internalTimerService;
+
+ // a counter to tag the order of all input streams when entering the
operator
+ private transient ValueState<Long> counterState;
+
+ private transient InternalMapState<RowData, TimeWindow, Long, RowData>
windowState;
+
+ private transient TriggerContextImpl triggerContext;
+
+ private transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient NamespaceAggsHandleFunctionBase<TimeWindow>
windowAggregator;
+
+ // ------------------------------------------------------------------------
+ // Metrics
+ // ------------------------------------------------------------------------
+
+ private transient Counter numNullRowTimeRecordsDropped;
Review Comment:
I think `AlignedWindowTableFunctionOperator` also need this metrics.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java:
##########
@@ -103,4 +111,67 @@ protected Transformation<RowData> translateToPlanInternal(
inputTransform.getParallelism(),
false);
}
+
+ private Transformation<RowData> translateWithUnalignedWindow(
+ PlannerBase planner,
+ ExecNodeConfig config,
+ RowType inputRowType,
+ Transformation<RowData> inputTransform) {
+ final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+ createUnalignedWindowTableFunctionOperator(config,
inputRowType);
+ final OneInputTransformation<RowData, RowData> transform =
+ ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationMeta(WINDOW_TRANSFORMATION,
config),
+ windowTableFunctionOperator,
+ InternalTypeInfo.of(getOutputType()),
+ inputTransform.getParallelism(),
+ false);
+
+ final int[] partitionKeys =
extractPartitionKeys(windowingStrategy.getWindow());
+ // set KeyType and Selector for state
+ final RowDataKeySelector selector =
+ KeySelectorUtil.getRowDataSelector(
+ planner.getFlinkContext().getClassLoader(),
+ partitionKeys,
+ InternalTypeInfo.of(inputRowType));
+ transform.setStateKeySelector(selector);
+ transform.setStateKeyType(selector.getProducedType());
+ return transform;
+ }
+
+ private int[] extractPartitionKeys(WindowSpec window) {
+ checkState(
+ window instanceof SessionWindowSpec,
+ "Only support unaligned window with session window now.");
+
+ return ((SessionWindowSpec) window).getPartitionKeyIndices();
+ }
+
+ private WindowTableFunctionOperatorBase
createAlignedWindowTableFunctionOperator(
+ ExecNodeConfig config) {
+ GroupWindowAssigner<TimeWindow> windowAssigner =
createWindowAssigner(windowingStrategy);
+ final ZoneId shiftTimeZone =
+ TimeWindowUtil.getShiftTimeZone(
+ windowingStrategy.getTimeAttributeType(),
+ TableConfigUtils.getLocalTimeZone(config));
+ return new AlignedWindowTableFunctionOperator(
+ windowAssigner, windowingStrategy.getTimeAttributeIndex(),
shiftTimeZone);
+ }
+
+ private WindowTableFunctionOperatorBase
createUnalignedWindowTableFunctionOperator(
+ ExecNodeConfig config, RowType inputRowType) {
+ GroupWindowAssigner<TimeWindow> windowAssigner =
createWindowAssigner(windowingStrategy);
+ final ZoneId shiftTimeZone =
+ TimeWindowUtil.getShiftTimeZone(
+ windowingStrategy.getTimeAttributeType(),
+ TableConfigUtils.getLocalTimeZone(config));
+
+ return new UnalignedWindowTableFunctionOperator(
+ windowAssigner,
+ windowAssigner.getWindowSerializer(new ExecutionConfig()),
+ inputRowType.getChildren().toArray(new LogicalType[0]),
Review Comment:
```suggestion
new RowDataSerializer(inputRowType),
```
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * <p>See more details about aligned window and unaligned window in {@link
+ *
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * <p>Note: The operator only applies for Window TVF with set semantics (e.g
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * <p>This operator emits result at the end of window instead of per record.
+ *
+ * <p>This operator will not compact changelog records.
+ *
+ * <p>This operator will keep the original order of input records when
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends
WindowTableFunctionOperatorBase
+ implements Triggerable<RowData, TimeWindow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME =
+ "numNullRowTimeRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME =
"numLateRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
"lateRecordsDroppedRate";
+ private static final String WATERMARK_LATENCY_METRIC_NAME =
"watermarkLatency";
+
+ private final Trigger<TimeWindow> trigger;
+
+ private final LogicalType[] inputFieldTypes;
+
+ private final TypeSerializer<TimeWindow> windowSerializer;
+
+ private transient InternalTimerService<TimeWindow> internalTimerService;
+
+ // a counter to tag the order of all input streams when entering the
operator
+ private transient ValueState<Long> counterState;
+
+ private transient InternalMapState<RowData, TimeWindow, Long, RowData>
windowState;
+
+ private transient TriggerContextImpl triggerContext;
+
+ private transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient NamespaceAggsHandleFunctionBase<TimeWindow>
windowAggregator;
+
+ // ------------------------------------------------------------------------
+ // Metrics
+ // ------------------------------------------------------------------------
+
+ private transient Counter numNullRowTimeRecordsDropped;
+ private transient Counter numLateRecordsDropped;
+ private transient Meter lateRecordsDroppedRate;
+ private transient Gauge<Long> watermarkLatency;
+
+ public UnalignedWindowTableFunctionOperator(
+ GroupWindowAssigner<TimeWindow> windowAssigner,
+ TypeSerializer<TimeWindow> windowSerializer,
+ LogicalType[] inputFieldTypes,
Review Comment:
```suggestion
TypeSerializer<RowData> inputSerializer,
```
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java:
##########
@@ -91,28 +83,24 @@ public void open() throws Exception {
}
@Override
- public void processElement(StreamRecord<RowData> element) throws Exception
{
- RowData inputRow = element.getValue();
- long timestamp;
- if (windowAssigner.isEventTime()) {
- if (inputRow.isNullAt(rowtimeIndex)) {
- // null timestamp would be dropped
- return;
- }
- timestamp = inputRow.getTimestamp(rowtimeIndex,
3).getMillisecond();
- } else {
- timestamp = getProcessingTimeService().getCurrentProcessingTime();
+ public void close() throws Exception {
+ super.close();
+ if (collector != null) {
+ collector.close();
}
- timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);
- Collection<TimeWindow> elementWindows =
windowAssigner.assignWindows(inputRow, timestamp);
- for (TimeWindow window : elementWindows) {
+ }
+
+ protected void collect(RowData inputRow, Collection<TimeWindow>
allWindows) {
+ for (TimeWindow window : allWindows) {
windowProperties.setField(0,
TimestampData.fromEpochMillis(window.getStart()));
windowProperties.setField(1,
TimestampData.fromEpochMillis(window.getEnd()));
windowProperties.setField(
2,
TimestampData.fromEpochMillis(
toEpochMills(window.maxTimestamp(),
shiftTimeZone)));
- collector.collect(outRow.replace(inputRow, windowProperties));
+ outRow.replace(inputRow, windowProperties);
+ outRow.setRowKind(inputRow.getRowKind());
Review Comment:
Why we need to set the RowKind manually?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java:
##########
@@ -170,4 +202,17 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int
indexOfCountStart) {
}
return new SlicingWindowOperator<>(windowProcessor);
}
+
+ @SuppressWarnings("unchecked")
+ private WindowOperatorBase<RowData, ?> buildUnslicingWindowOperator() {
+ final UnsliceWindowAggProcessor windowProcessor =
+ new UnsliceWindowAggProcessor(
+ (GeneratedNamespaceAggsHandleFunction<TimeWindow>)
+ generatedAggregateFunction,
+ (UnsliceAssigner<TimeWindow>) assigner,
+ accSerializer,
+ indexOfCountStart,
+ shiftTimeZone);
+ return new UnslicingWindowOperator<>(windowProcessor);
Review Comment:
Why remove the UnsupportedOperationException?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * <p>See more details about aligned window and unaligned window in {@link
+ *
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * <p>Note: The operator only applies for Window TVF with set semantics (e.g
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * <p>This operator emits result at the end of window instead of per record.
+ *
+ * <p>This operator will not compact changelog records.
+ *
+ * <p>This operator will keep the original order of input records when
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends
WindowTableFunctionOperatorBase
+ implements Triggerable<RowData, TimeWindow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME =
+ "numNullRowTimeRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME =
"numLateRecordsDropped";
+ private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
"lateRecordsDroppedRate";
+ private static final String WATERMARK_LATENCY_METRIC_NAME =
"watermarkLatency";
+
+ private final Trigger<TimeWindow> trigger;
+
+ private final LogicalType[] inputFieldTypes;
Review Comment:
```suggestion
private final TypeSerializer<RowData> inputSerializer;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]