lsyldliu commented on code in PR #24150:
URL: https://github.com/apache/flink/pull/24150#discussion_r1461306740
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java:
##########
@@ -26,9 +27,19 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = TumblingWindowSpec.class),
@JsonSubTypes.Type(value = HoppingWindowSpec.class),
- @JsonSubTypes.Type(value = CumulativeWindowSpec.class)
+ @JsonSubTypes.Type(value = CumulativeWindowSpec.class),
+ @JsonSubTypes.Type(value = SessionWindowSpec.class)
})
public interface WindowSpec {
- String toSummaryString(String windowing);
+ String toSummaryString(String windowing, String[] inputFieldNames);
+
+ /**
+ * Return if the window is a aligned window.
Review Comment:
Return true if the window is aligned.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingTriggerContextBase.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+
+import java.time.ZoneId;
+import java.util.Collection;
+
+/**
+ * A base context for unslicing window trigger.
+ *
+ * <p>{@link UnslicingTriggerContextBase} is a utility for handling {@link
Trigger} invocations. It
+ * can be reused by setting the {@code key} and {@code window} fields. No
internal state must be
+ * kept in the {@link Trigger.TriggerContext}
+ */
+public abstract class UnslicingTriggerContextBase<K, W extends Window>
Review Comment:
You say this is a utility class, why here need to extract an abstract class?
Anyone else also will extends this class?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowProcessor.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
+
+/** A processor that processes elements for unslicing windows. */
+@Internal
+public interface UnslicingWindowProcessor<W> extends WindowProcessor<W> {}
Review Comment:
Why `UnslicingWindowProcessor` need to extends `WindowProcessor`? I think
it's enough to use it as a markup interface because
`AbstractWindowAggProcessor` has implemented it.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.processors;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/**
+ * An window aggregate processor implementation which works for {@link
UnsliceAssigner}, e.g.
+ * session windows.
+ */
+public class UnsliceWindowAggProcessor extends
AbstractWindowAggProcessor<TimeWindow>
+ implements UnslicingWindowProcessor<TimeWindow> {
+
+ private final UnsliceAssigner<TimeWindow> unsliceAssigner;
+
+ private final Trigger<TimeWindow> trigger;
Review Comment:
Local variable is enough
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import org.apache.flink.table.runtime.operators.window.tvf.common.ClockService;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
+
+import java.util.Optional;
+
+/**
+ * A {@link UnsliceAssigner} assigns each element into a single window and not
divides the window
+ * into finite number of non-overlapping slice. Different with {@link
SliceAssigner}, we use the
+ * {@link Window} to identifier a window.
+ *
+ * <p>{@link UnsliceAssigner} is designed for unaligned Windows like session
window.
+ *
+ * <p>Because unaligned Windows are windows determined dynamically based on
elements, and its window
+ * boundaries are determined based on the messages timestamps and their
correlations, some windows
+ * may be merged into one.
+ *
+ * @see UnslicingWindowOperator for more definition of unslice window.
+ */
+@Internal
+public interface UnsliceAssigner<W extends Window> extends WindowAssigner {
+
+ /**
+ * Returns the {@link Window} that the given element should belong to be
used to trigger on.
+ *
+ * <p>See more details in {@link
MergingWindowProcessFunction#assignActualWindows}.
+ *
+ * @param element the element to which slice should belong to.
+ * @param clock the service to get current processing time.
+ * @return if empty, that means the element is late.
+ */
+ Optional<W> assignActualWindow(
+ RowData element, ClockService clock,
MergingWindowProcessFunction<?, W> windowFunction)
+ throws Exception;
+
+ /**
+ * Returns the {@link Window} that the given element should belong to be
used as a namespace to
+ * restore the state.
+ *
+ * <p>See more details in {@link
MergingWindowProcessFunction#assignStateNamespace}.
+ *
+ * @param element the element to which slice should belong to.
+ * @param clock the service to get current processing time.
+ * @return if empty, that means the element is late.
+ */
+ Optional<W> assignStateNamespace(
+ RowData element, ClockService clock,
MergingWindowProcessFunction<?, W> windowFunction)
+ throws Exception;
+
+ /**
+ * Currently, unslice assigner has an inner {@link MergingWindowAssigner}
to reuse the logic in
+ * {@link
+ *
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner}
to
+ * merge windows.
+ */
+ MergingWindowAssigner<W> getInnerMergingWindowAssigner();
+
+ boolean isEventTime();
Review Comment:
This declaration is no need.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingTriggerContextBase.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+
+import java.time.ZoneId;
+import java.util.Collection;
+
+/**
+ * A base context for unslicing window trigger.
+ *
+ * <p>{@link UnslicingTriggerContextBase} is a utility for handling {@link
Trigger} invocations. It
+ * can be reused by setting the {@code key} and {@code window} fields. No
internal state must be
+ * kept in the {@link Trigger.TriggerContext}
+ */
+public abstract class UnslicingTriggerContextBase<K, W extends Window>
+ implements Trigger.TriggerContext, Trigger.OnMergeContext {
Review Comment:
`OnMergeContext` implements `TriggerContext` directly, so here only
implement the `OnMergeContext` is enough?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.processors;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/**
+ * An window aggregate processor implementation which works for {@link
UnsliceAssigner}, e.g.
+ * session windows.
+ */
+public class UnsliceWindowAggProcessor extends
AbstractWindowAggProcessor<TimeWindow>
+ implements UnslicingWindowProcessor<TimeWindow> {
+
+ private final UnsliceAssigner<TimeWindow> unsliceAssigner;
+
+ private final Trigger<TimeWindow> trigger;
+
+ //
----------------------------------------------------------------------------------------
+
+ private transient MetricGroup metrics;
+
+ protected transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient TriggerContextImpl triggerContext;
+
+ public UnsliceWindowAggProcessor(
+ GeneratedNamespaceAggsHandleFunction<TimeWindow> genAggsHandler,
+ UnsliceAssigner<TimeWindow> unsliceAssigner,
+ TypeSerializer<RowData> accSerializer,
+ int indexOfCountStar,
+ ZoneId shiftTimeZone) {
+ super(
+ genAggsHandler,
+ unsliceAssigner,
+ accSerializer,
+ unsliceAssigner.isEventTime(),
+ indexOfCountStar,
+ shiftTimeZone);
+ this.unsliceAssigner = unsliceAssigner;
+ if (isEventTime) {
+ trigger = EventTimeTriggers.afterEndOfWindow();
+ } else {
+ trigger = ProcessingTimeTriggers.afterEndOfWindow();
+ }
+ }
+
+ @Override
+ public void open(Context<TimeWindow> context) throws Exception {
+ super.open(context);
+ this.metrics = context.getRuntimeContext().getMetricGroup();
+ this.windowFunction =
+ new MergingWindowProcessFunction<>(
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ aggregator,
+ unsliceAssigner
+ .getInnerMergingWindowAssigner()
+ .getWindowSerializer(new ExecutionConfig()),
+ 0L);
+
+ triggerContext = new TriggerContextImpl();
+ triggerContext.open();
+
+ WindowContextImpl windowContext =
+ new WindowContextImpl(
+ shiftTimeZone,
+ ctx.getTimerService(),
+ internalWindowState,
+ null,
+ aggregator,
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ triggerContext);
+
+ this.windowFunction.open(windowContext);
+ }
+
+ @Override
+ public boolean processElement(RowData key, RowData element) throws
Exception {
+ // the windows which the input row should be placed into
+ Optional<TimeWindow> affectedWindowOp =
+ unsliceAssigner.assignStateNamespace(element, clockService,
windowFunction);
+ boolean isElementDropped = true;
+ if (affectedWindowOp.isPresent()) {
+ TimeWindow affectedWindow = affectedWindowOp.get();
+ isElementDropped = false;
+
+ RowData acc = windowState.value(affectedWindow);
+ if (acc == null) {
+ acc = aggregator.createAccumulators();
+ }
+ aggregator.setAccumulators(affectedWindow, acc);
+
+ if (RowDataUtil.isAccumulateMsg(element)) {
+ aggregator.accumulate(element);
+ } else {
+ aggregator.retract(element);
+ }
+ acc = aggregator.getAccumulators();
+ windowState.update(affectedWindow, acc);
+ }
+
+ // the actual window which the input row is belongs to
+ Optional<TimeWindow> actualWindowOp =
+ unsliceAssigner.assignActualWindow(element, clockService,
windowFunction);
+ if (actualWindowOp.isPresent()) {
+ TimeWindow actualWindow = actualWindowOp.get();
+ isElementDropped = false;
+ triggerContext.setWindow(actualWindow);
+ // register a timer for the window to fire and clean up
+ // TODO support allowedLateness
+ long triggerTime =
toEpochMillsForTimer(actualWindow.maxTimestamp(), shiftTimeZone);
+ if (isEventTime) {
+ triggerContext.registerEventTimeTimer(triggerTime);
+ } else {
+ triggerContext.registerProcessingTimeTimer(triggerTime);
+ }
+ }
+ return isElementDropped;
+ }
+
+ @Override
+ public void fireWindow(long timerTimestamp, TimeWindow window) throws
Exception {
+ windowFunction.prepareAggregateAccumulatorForEmit(window);
+ RowData aggResult = aggregator.getValue(window);
+ triggerContext.setWindow(window);
+ final boolean checkNeedFire;
Review Comment:
isFired?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowContextBase.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Collection;
+
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/** A base context for window. */
+public abstract class UnslicingWindowContextBase<K, W extends Window>
Review Comment:
Ditto, why here need to extract an abstract class?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.assertj.core.api.Assertions.fail;
+
+/** A test base for window aggregate operator. */
+public abstract class WindowAggOperatorTestBase {
+
+ protected static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+ protected static final ZoneId SHANGHAI_ZONE_ID =
ZoneId.of("Asia/Shanghai");
+
+ protected final ZoneId shiftTimeZone;
+
+ public WindowAggOperatorTestBase(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ /** Get the timestamp in mills by given epoch mills and timezone. */
+ protected long localMills(long epochMills) {
+ return toUtcTimestampMills(epochMills, shiftTimeZone);
+ }
+
+ // ============================== Utils ==============================
+
+ // ============================== Util Fields
==============================
+
+ private static final RowType INPUT_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new
VarCharType(Integer.MAX_VALUE)),
+ new RowType.RowField("f1", new IntType()),
+ new RowType.RowField("f2", new TimestampType())));
+
+ protected static final RowDataSerializer INPUT_ROW_SER = new
RowDataSerializer(INPUT_ROW_TYPE);
+
+ protected static final RowDataSerializer ACC_SER =
+ new RowDataSerializer(new BigIntType(), new BigIntType());
+
+ protected static final LogicalType[] OUTPUT_TYPES =
+ new LogicalType[] {
+ new VarCharType(Integer.MAX_VALUE),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType()
+ };
+
+ protected static final RowDataKeySelector KEY_SELECTOR =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new
LogicalType[0]));
+
+ protected static final PagedTypeSerializer<RowData> KEY_SER =
+ (PagedTypeSerializer<RowData>)
KEY_SELECTOR.getProducedType().toSerializer();
+
+ protected static final TypeSerializer<RowData> OUT_SERIALIZER =
+ new RowDataSerializer(OUTPUT_TYPES);
+
+ protected static final RowDataHarnessAssertor ASSERTER =
+ new RowDataHarnessAssertor(
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0,
VarCharType.STRING_TYPE));
+
+ // ============================== Util Functions
==============================
+
+ protected static OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ WindowOperatorBase<RowData, ?> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+ }
+
+ protected static <T> GeneratedNamespaceAggsHandleFunction<T> wrapGenerated(
Review Comment:
createGeneratedAggsHandle
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.assertj.core.api.Assertions.fail;
+
+/** A test base for window aggregate operator. */
+public abstract class WindowAggOperatorTestBase {
+
+ protected static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+ protected static final ZoneId SHANGHAI_ZONE_ID =
ZoneId.of("Asia/Shanghai");
+
+ protected final ZoneId shiftTimeZone;
+
+ public WindowAggOperatorTestBase(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ /** Get the timestamp in mills by given epoch mills and timezone. */
+ protected long localMills(long epochMills) {
+ return toUtcTimestampMills(epochMills, shiftTimeZone);
+ }
+
+ // ============================== Utils ==============================
+
+ // ============================== Util Fields
==============================
+
+ private static final RowType INPUT_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new
VarCharType(Integer.MAX_VALUE)),
+ new RowType.RowField("f1", new IntType()),
+ new RowType.RowField("f2", new TimestampType())));
+
+ protected static final RowDataSerializer INPUT_ROW_SER = new
RowDataSerializer(INPUT_ROW_TYPE);
+
+ protected static final RowDataSerializer ACC_SER =
+ new RowDataSerializer(new BigIntType(), new BigIntType());
+
+ protected static final LogicalType[] OUTPUT_TYPES =
+ new LogicalType[] {
+ new VarCharType(Integer.MAX_VALUE),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType()
+ };
+
+ protected static final RowDataKeySelector KEY_SELECTOR =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new
LogicalType[0]));
+
+ protected static final PagedTypeSerializer<RowData> KEY_SER =
+ (PagedTypeSerializer<RowData>)
KEY_SELECTOR.getProducedType().toSerializer();
+
+ protected static final TypeSerializer<RowData> OUT_SERIALIZER =
+ new RowDataSerializer(OUTPUT_TYPES);
+
+ protected static final RowDataHarnessAssertor ASSERTER =
+ new RowDataHarnessAssertor(
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0,
VarCharType.STRING_TYPE));
+
+ // ============================== Util Functions
==============================
+
+ protected static OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ WindowOperatorBase<RowData, ?> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+ }
+
+ protected static <T> GeneratedNamespaceAggsHandleFunction<T> wrapGenerated(
+ NamespaceAggsHandleFunction<T> aggsFunction) {
+ return new GeneratedNamespaceAggsHandleFunction<T>("N/A", "", new
Object[0]) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public NamespaceAggsHandleFunction<T> newInstance(ClassLoader
classLoader) {
+ return aggsFunction;
+ }
+ };
+ }
+
+ /** Get epoch mills from a timestamp string and the time zone the
timestamp belongs. */
+ protected static long epochMills(ZoneId shiftTimeZone, String
timestampStr) {
+ LocalDateTime localDateTime = LocalDateTime.parse(timestampStr);
+ ZoneOffset zoneOffset =
shiftTimeZone.getRules().getOffset(localDateTime);
+ return localDateTime.toInstant(zoneOffset).toEpochMilli();
+ }
+
+ /**
+ * This performs a {@code SUM(f1), COUNT(f1)}, where f1 is BIGINT type.
The return value
+ * contains {@code sum, count, window_start, window_end}.
+ */
+ protected abstract static class SumAndCountAggsFunctionBase<T>
+ implements NamespaceAggsHandleFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ boolean openCalled;
+ final AtomicInteger closeCalled = new AtomicInteger(0);
+
+ long sum;
+ boolean sumIsNull;
+ long count;
+ boolean countIsNull;
+
+ protected transient JoinedRowData result;
+
+ public void open(StateDataViewStore store) throws Exception {
+ openCalled = true;
+ result = new JoinedRowData();
+ }
+
+ public void setAccumulators(T window, RowData acc) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ sumIsNull = acc.isNullAt(0);
+ if (!sumIsNull) {
+ sum = acc.getLong(0);
+ } else {
+ sum = 0L;
+ }
+
+ countIsNull = acc.isNullAt(1);
+ if (!countIsNull) {
+ count = acc.getLong(1);
+ } else {
+ count = 0L;
+ }
+ }
+
+ public void accumulate(RowData inputRow) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean inputIsNull = inputRow.isNullAt(1);
+ if (!inputIsNull) {
+ sum += inputRow.getInt(1);
+ count += 1;
+ sumIsNull = false;
+ countIsNull = false;
+ }
+ }
+
+ public void retract(RowData inputRow) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean inputIsNull = inputRow.isNullAt(1);
+ if (!inputIsNull) {
+ sum -= inputRow.getInt(1);
+ count -= 1;
+ }
+ }
+
+ public void merge(T window, RowData otherAcc) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean sumIsNull2 = otherAcc.isNullAt(0);
+ if (!sumIsNull2) {
+ sum += otherAcc.getLong(0);
+ sumIsNull = false;
+ }
+ boolean countIsNull2 = otherAcc.isNullAt(1);
+ if (!countIsNull2) {
+ count += otherAcc.getLong(1);
+ countIsNull = false;
+ }
+ }
+
+ public RowData createAccumulators() {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ GenericRowData rowData = new GenericRowData(2);
+ rowData.setField(1, 0L); // count has default 0 value
+ return rowData;
+ }
+
+ public RowData getAccumulators() throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ GenericRowData row = new GenericRowData(2);
+ if (!sumIsNull) {
+ row.setField(0, sum);
+ }
+ if (!countIsNull) {
+ row.setField(1, count);
+ }
+ return row;
+ }
+
+ public void cleanup(T window) {}
+
+ public void close() {
+ closeCalled.incrementAndGet();
+ }
+
+ @Override
+ public RowData getValue(T window) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ GenericRowData row = new GenericRowData(4);
+ if (!sumIsNull) {
Review Comment:
ditto
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import org.apache.flink.table.runtime.operators.window.tvf.common.ClockService;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
+
+import java.util.Optional;
+
+/**
+ * A {@link UnsliceAssigner} assigns each element into a single window and not
divides the window
+ * into finite number of non-overlapping slice. Different with {@link
SliceAssigner}, we use the
+ * {@link Window} to identifier a window.
+ *
+ * <p>{@link UnsliceAssigner} is designed for unaligned Windows like session
window.
+ *
+ * <p>Because unaligned Windows are windows determined dynamically based on
elements, and its window
+ * boundaries are determined based on the messages timestamps and their
correlations, some windows
+ * may be merged into one.
+ *
+ * @see UnslicingWindowOperator for more definition of unslice window.
+ */
+@Internal
+public interface UnsliceAssigner<W extends Window> extends WindowAssigner {
+
+ /**
+ * Returns the {@link Window} that the given element should belong to be
used to trigger on.
+ *
+ * <p>See more details in {@link
MergingWindowProcessFunction#assignActualWindows}.
+ *
+ * @param element the element to which slice should belong to.
+ * @param clock the service to get current processing time.
+ * @return if empty, that means the element is late.
+ */
+ Optional<W> assignActualWindow(
+ RowData element, ClockService clock,
MergingWindowProcessFunction<?, W> windowFunction)
+ throws Exception;
+
+ /**
+ * Returns the {@link Window} that the given element should belong to be
used as a namespace to
+ * restore the state.
+ *
+ * <p>See more details in {@link
MergingWindowProcessFunction#assignStateNamespace}.
+ *
+ * @param element the element to which slice should belong to.
+ * @param clock the service to get current processing time.
+ * @return if empty, that means the element is late.
+ */
+ Optional<W> assignStateNamespace(
+ RowData element, ClockService clock,
MergingWindowProcessFunction<?, W> windowFunction)
+ throws Exception;
+
+ /**
+ * Currently, unslice assigner has an inner {@link MergingWindowAssigner}
to reuse the logic in
+ * {@link
+ *
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner}
to
+ * merge windows.
+ */
+ MergingWindowAssigner<W> getInnerMergingWindowAssigner();
Review Comment:
getMergingWindowAssigner
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.data.TimestampData.fromEpochMillis;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for unslicing window aggregate operators created by {@link
WindowAggOperatorBuilder}. */
+@RunWith(Parameterized.class)
+public class UnslicingWindowAggOperatorTest extends WindowAggOperatorTestBase {
+
+ public UnslicingWindowAggOperatorTest(ZoneId shiftTimeZone) {
+ super(shiftTimeZone);
+ }
+
+ @Test
+ public void testEventTimeSessionWindows() throws Exception {
+ final UnsliceAssigner<TimeWindow> assigner =
+ UnsliceAssigners.session(2, shiftTimeZone,
Duration.ofSeconds(3));
+
+ final UnslicingSumAndCountAggsFunction aggsFunction =
+ new UnslicingSumAndCountAggsFunction();
+ UnslicingWindowOperator<RowData, ?> operator =
+ (UnslicingWindowOperator<RowData, ?>)
+ WindowAggOperatorBuilder.builder()
+ .inputSerializer(INPUT_ROW_SER)
+ .shiftTimeZone(shiftTimeZone)
+ .keySerializer(KEY_SER)
+ .assigner(assigner)
+ .aggregate(wrapGenerated(aggsFunction),
ACC_SER)
+ .build();
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(operator);
+
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.open();
+
+ // process elements
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ // add elements out-of-order
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3000L)));
+
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(20L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(0L)));
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(999L)));
+
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1998L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1999L)));
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(1000L)));
+
+ testHarness.processWatermark(new Watermark(999));
+ expectedOutput.add(new Watermark(999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(1999));
+ expectedOutput.add(new Watermark(1999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ testHarness.prepareSnapshotPreBarrier(0L);
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+
+ assertThat(aggsFunction.closeCalled.get()).as("Close was not
called.").isGreaterThan(0);
+
+ expectedOutput.clear();
+ testHarness = createTestHarness(operator);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(2999));
+ expectedOutput.add(new Watermark(2999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(3999));
+ expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L),
localMills(3999L)));
+ expectedOutput.add(new Watermark(3999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // late element
+ testHarness.processElement(insertRecord("key2", 1,
fromEpochMillis(3500L)));
+
+ testHarness.processWatermark(new Watermark(4999));
+ expectedOutput.add(new Watermark(4999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // late for all assigned windows, should be dropped
+ testHarness.processElement(insertRecord("key1", 1,
fromEpochMillis(999L)));
+
+ testHarness.processWatermark(new Watermark(5999));
+ expectedOutput.add(new Watermark(5999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(6999));
+ expectedOutput.add(insertRecord("key2", 6L, 6L, localMills(1000L),
localMills(6999L)));
+ expectedOutput.add(new Watermark(6999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // those don't have any effect...
+ testHarness.processWatermark(new Watermark(7999));
+ testHarness.processWatermark(new Watermark(8999));
+ expectedOutput.add(new Watermark(7999));
+ expectedOutput.add(new Watermark(8999));
+
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+
assertThat(operator.getNumLateRecordsDropped().getCount()).isEqualTo(1);
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeSessionWindows() throws Exception {
Review Comment:
1. Can you also test the cdc case?
2. Can you test the retract message is out-of-order in processing time?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.assertj.core.api.Assertions.fail;
+
+/** A test base for window aggregate operator. */
+public abstract class WindowAggOperatorTestBase {
+
+ protected static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+ protected static final ZoneId SHANGHAI_ZONE_ID =
ZoneId.of("Asia/Shanghai");
+
+ protected final ZoneId shiftTimeZone;
+
+ public WindowAggOperatorTestBase(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ /** Get the timestamp in mills by given epoch mills and timezone. */
+ protected long localMills(long epochMills) {
+ return toUtcTimestampMills(epochMills, shiftTimeZone);
+ }
+
+ // ============================== Utils ==============================
+
+ // ============================== Util Fields
==============================
+
+ private static final RowType INPUT_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new
VarCharType(Integer.MAX_VALUE)),
+ new RowType.RowField("f1", new IntType()),
+ new RowType.RowField("f2", new TimestampType())));
+
+ protected static final RowDataSerializer INPUT_ROW_SER = new
RowDataSerializer(INPUT_ROW_TYPE);
+
+ protected static final RowDataSerializer ACC_SER =
+ new RowDataSerializer(new BigIntType(), new BigIntType());
+
+ protected static final LogicalType[] OUTPUT_TYPES =
+ new LogicalType[] {
+ new VarCharType(Integer.MAX_VALUE),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType(),
+ new BigIntType()
+ };
+
+ protected static final RowDataKeySelector KEY_SELECTOR =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new
LogicalType[0]));
+
+ protected static final PagedTypeSerializer<RowData> KEY_SER =
+ (PagedTypeSerializer<RowData>)
KEY_SELECTOR.getProducedType().toSerializer();
+
+ protected static final TypeSerializer<RowData> OUT_SERIALIZER =
+ new RowDataSerializer(OUTPUT_TYPES);
+
+ protected static final RowDataHarnessAssertor ASSERTER =
+ new RowDataHarnessAssertor(
+ OUTPUT_TYPES, new GenericRowRecordSortComparator(0,
VarCharType.STRING_TYPE));
+
+ // ============================== Util Functions
==============================
+
+ protected static OneInputStreamOperatorTestHarness<RowData, RowData>
createTestHarness(
+ WindowOperatorBase<RowData, ?> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+ }
+
+ protected static <T> GeneratedNamespaceAggsHandleFunction<T> wrapGenerated(
+ NamespaceAggsHandleFunction<T> aggsFunction) {
+ return new GeneratedNamespaceAggsHandleFunction<T>("N/A", "", new
Object[0]) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public NamespaceAggsHandleFunction<T> newInstance(ClassLoader
classLoader) {
+ return aggsFunction;
+ }
+ };
+ }
+
+ /** Get epoch mills from a timestamp string and the time zone the
timestamp belongs. */
+ protected static long epochMills(ZoneId shiftTimeZone, String
timestampStr) {
+ LocalDateTime localDateTime = LocalDateTime.parse(timestampStr);
+ ZoneOffset zoneOffset =
shiftTimeZone.getRules().getOffset(localDateTime);
+ return localDateTime.toInstant(zoneOffset).toEpochMilli();
+ }
+
+ /**
+ * This performs a {@code SUM(f1), COUNT(f1)}, where f1 is BIGINT type.
The return value
+ * contains {@code sum, count, window_start, window_end}.
+ */
+ protected abstract static class SumAndCountAggsFunctionBase<T>
+ implements NamespaceAggsHandleFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ boolean openCalled;
+ final AtomicInteger closeCalled = new AtomicInteger(0);
+
+ long sum;
+ boolean sumIsNull;
+ long count;
+ boolean countIsNull;
+
+ protected transient JoinedRowData result;
+
+ public void open(StateDataViewStore store) throws Exception {
+ openCalled = true;
+ result = new JoinedRowData();
+ }
+
+ public void setAccumulators(T window, RowData acc) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ sumIsNull = acc.isNullAt(0);
+ if (!sumIsNull) {
+ sum = acc.getLong(0);
+ } else {
+ sum = 0L;
+ }
+
+ countIsNull = acc.isNullAt(1);
+ if (!countIsNull) {
+ count = acc.getLong(1);
+ } else {
+ count = 0L;
+ }
+ }
+
+ public void accumulate(RowData inputRow) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean inputIsNull = inputRow.isNullAt(1);
+ if (!inputIsNull) {
+ sum += inputRow.getInt(1);
+ count += 1;
+ sumIsNull = false;
+ countIsNull = false;
+ }
+ }
+
+ public void retract(RowData inputRow) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean inputIsNull = inputRow.isNullAt(1);
+ if (!inputIsNull) {
+ sum -= inputRow.getInt(1);
+ count -= 1;
+ }
+ }
+
+ public void merge(T window, RowData otherAcc) throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ boolean sumIsNull2 = otherAcc.isNullAt(0);
+ if (!sumIsNull2) {
+ sum += otherAcc.getLong(0);
+ sumIsNull = false;
+ }
+ boolean countIsNull2 = otherAcc.isNullAt(1);
+ if (!countIsNull2) {
+ count += otherAcc.getLong(1);
+ countIsNull = false;
+ }
+ }
+
+ public RowData createAccumulators() {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ GenericRowData rowData = new GenericRowData(2);
+ rowData.setField(1, 0L); // count has default 0 value
+ return rowData;
+ }
+
+ public RowData getAccumulators() throws Exception {
+ if (!openCalled) {
+ fail("Open was not called");
+ }
+ GenericRowData row = new GenericRowData(2);
Review Comment:
If sumIsNull is null, please also set field to null
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.processors;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/**
+ * An window aggregate processor implementation which works for {@link
UnsliceAssigner}, e.g.
+ * session windows.
+ */
+public class UnsliceWindowAggProcessor extends
AbstractWindowAggProcessor<TimeWindow>
+ implements UnslicingWindowProcessor<TimeWindow> {
+
+ private final UnsliceAssigner<TimeWindow> unsliceAssigner;
+
+ private final Trigger<TimeWindow> trigger;
+
+ //
----------------------------------------------------------------------------------------
+
+ private transient MetricGroup metrics;
+
+ protected transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient TriggerContextImpl triggerContext;
+
+ public UnsliceWindowAggProcessor(
+ GeneratedNamespaceAggsHandleFunction<TimeWindow> genAggsHandler,
+ UnsliceAssigner<TimeWindow> unsliceAssigner,
+ TypeSerializer<RowData> accSerializer,
+ int indexOfCountStar,
+ ZoneId shiftTimeZone) {
+ super(
+ genAggsHandler,
+ unsliceAssigner,
+ accSerializer,
+ unsliceAssigner.isEventTime(),
+ indexOfCountStar,
+ shiftTimeZone);
+ this.unsliceAssigner = unsliceAssigner;
+ if (isEventTime) {
+ trigger = EventTimeTriggers.afterEndOfWindow();
+ } else {
+ trigger = ProcessingTimeTriggers.afterEndOfWindow();
+ }
+ }
+
+ @Override
+ public void open(Context<TimeWindow> context) throws Exception {
+ super.open(context);
+ this.metrics = context.getRuntimeContext().getMetricGroup();
+ this.windowFunction =
+ new MergingWindowProcessFunction<>(
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ aggregator,
+ unsliceAssigner
+ .getInnerMergingWindowAssigner()
+ .getWindowSerializer(new ExecutionConfig()),
+ 0L);
+
+ triggerContext = new TriggerContextImpl();
+ triggerContext.open();
+
+ WindowContextImpl windowContext =
+ new WindowContextImpl(
+ shiftTimeZone,
+ ctx.getTimerService(),
+ internalWindowState,
+ null,
+ aggregator,
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ triggerContext);
+
+ this.windowFunction.open(windowContext);
+ }
+
+ @Override
+ public boolean processElement(RowData key, RowData element) throws
Exception {
+ // the windows which the input row should be placed into
+ Optional<TimeWindow> affectedWindowOp =
+ unsliceAssigner.assignStateNamespace(element, clockService,
windowFunction);
+ boolean isElementDropped = true;
+ if (affectedWindowOp.isPresent()) {
+ TimeWindow affectedWindow = affectedWindowOp.get();
+ isElementDropped = false;
+
+ RowData acc = windowState.value(affectedWindow);
+ if (acc == null) {
+ acc = aggregator.createAccumulators();
+ }
+ aggregator.setAccumulators(affectedWindow, acc);
+
+ if (RowDataUtil.isAccumulateMsg(element)) {
+ aggregator.accumulate(element);
+ } else {
+ aggregator.retract(element);
+ }
+ acc = aggregator.getAccumulators();
+ windowState.update(affectedWindow, acc);
+ }
+
+ // the actual window which the input row is belongs to
+ Optional<TimeWindow> actualWindowOp =
+ unsliceAssigner.assignActualWindow(element, clockService,
windowFunction);
+ if (actualWindowOp.isPresent()) {
+ TimeWindow actualWindow = actualWindowOp.get();
+ isElementDropped = false;
+ triggerContext.setWindow(actualWindow);
+ // register a timer for the window to fire and clean up
+ // TODO support allowedLateness
+ long triggerTime =
toEpochMillsForTimer(actualWindow.maxTimestamp(), shiftTimeZone);
+ if (isEventTime) {
+ triggerContext.registerEventTimeTimer(triggerTime);
+ } else {
+ triggerContext.registerProcessingTimeTimer(triggerTime);
+ }
+ }
+ return isElementDropped;
+ }
+
+ @Override
+ public void fireWindow(long timerTimestamp, TimeWindow window) throws
Exception {
+ windowFunction.prepareAggregateAccumulatorForEmit(window);
+ RowData aggResult = aggregator.getValue(window);
+ triggerContext.setWindow(window);
+ final boolean checkNeedFire;
+ if (isEventTime) {
+ checkNeedFire = triggerContext.onEventTime(timerTimestamp);
+ } else {
+ checkNeedFire = triggerContext.onProcessingTime(timerTimestamp);
+ }
+ // we shouldn't emit an empty window
+ if (checkNeedFire && !emptySupplier.get()) {
+ collect(aggResult);
+ }
+ }
+
+ @Override
+ public void clearWindow(long timerTimestamp, TimeWindow window) throws
Exception {
+ windowFunction.cleanWindowIfNeeded(window, timerTimestamp);
+ }
+
+ @Override
+ public void advanceProgress(long progress) throws Exception {}
Review Comment:
Why here doesn't need to advance the watermark?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.data.TimestampData.fromEpochMillis;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for unslicing window aggregate operators created by {@link
WindowAggOperatorBuilder}. */
+@RunWith(Parameterized.class)
+public class UnslicingWindowAggOperatorTest extends WindowAggOperatorTestBase {
+
+ public UnslicingWindowAggOperatorTest(ZoneId shiftTimeZone) {
+ super(shiftTimeZone);
+ }
+
+ @Test
+ public void testEventTimeSessionWindows() throws Exception {
+ final UnsliceAssigner<TimeWindow> assigner =
Review Comment:
Can you add the following test case:
1. cdc message
2. The +I & -D message merged which cause one session window split to two
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.data.TimestampData.fromEpochMillis;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for unslicing window aggregate operators created by {@link
WindowAggOperatorBuilder}. */
+@RunWith(Parameterized.class)
+public class UnslicingWindowAggOperatorTest extends WindowAggOperatorTestBase {
+
+ public UnslicingWindowAggOperatorTest(ZoneId shiftTimeZone) {
+ super(shiftTimeZone);
+ }
+
+ @Test
+ public void testEventTimeSessionWindows() throws Exception {
+ final UnsliceAssigner<TimeWindow> assigner =
+ UnsliceAssigners.session(2, shiftTimeZone,
Duration.ofSeconds(3));
+
+ final UnslicingSumAndCountAggsFunction aggsFunction =
+ new UnslicingSumAndCountAggsFunction();
+ UnslicingWindowOperator<RowData, ?> operator =
+ (UnslicingWindowOperator<RowData, ?>)
+ WindowAggOperatorBuilder.builder()
+ .inputSerializer(INPUT_ROW_SER)
+ .shiftTimeZone(shiftTimeZone)
+ .keySerializer(KEY_SER)
Review Comment:
Can you also test without key case?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.processors;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/**
+ * An window aggregate processor implementation which works for {@link
UnsliceAssigner}, e.g.
+ * session windows.
+ */
+public class UnsliceWindowAggProcessor extends
AbstractWindowAggProcessor<TimeWindow>
+ implements UnslicingWindowProcessor<TimeWindow> {
+
+ private final UnsliceAssigner<TimeWindow> unsliceAssigner;
+
+ private final Trigger<TimeWindow> trigger;
+
+ //
----------------------------------------------------------------------------------------
+
+ private transient MetricGroup metrics;
+
+ protected transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient TriggerContextImpl triggerContext;
+
+ public UnsliceWindowAggProcessor(
+ GeneratedNamespaceAggsHandleFunction<TimeWindow> genAggsHandler,
+ UnsliceAssigner<TimeWindow> unsliceAssigner,
+ TypeSerializer<RowData> accSerializer,
+ int indexOfCountStar,
+ ZoneId shiftTimeZone) {
+ super(
+ genAggsHandler,
+ unsliceAssigner,
+ accSerializer,
+ unsliceAssigner.isEventTime(),
+ indexOfCountStar,
+ shiftTimeZone);
+ this.unsliceAssigner = unsliceAssigner;
+ if (isEventTime) {
+ trigger = EventTimeTriggers.afterEndOfWindow();
+ } else {
+ trigger = ProcessingTimeTriggers.afterEndOfWindow();
+ }
+ }
+
+ @Override
+ public void open(Context<TimeWindow> context) throws Exception {
+ super.open(context);
+ this.metrics = context.getRuntimeContext().getMetricGroup();
+ this.windowFunction =
+ new MergingWindowProcessFunction<>(
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ aggregator,
+ unsliceAssigner
+ .getInnerMergingWindowAssigner()
+ .getWindowSerializer(new ExecutionConfig()),
+ 0L);
+
+ triggerContext = new TriggerContextImpl();
+ triggerContext.open();
+
+ WindowContextImpl windowContext =
+ new WindowContextImpl(
+ shiftTimeZone,
+ ctx.getTimerService(),
+ internalWindowState,
+ null,
+ aggregator,
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ triggerContext);
+
+ this.windowFunction.open(windowContext);
+ }
+
+ @Override
+ public boolean processElement(RowData key, RowData element) throws
Exception {
+ // the windows which the input row should be placed into
+ Optional<TimeWindow> affectedWindowOp =
+ unsliceAssigner.assignStateNamespace(element, clockService,
windowFunction);
+ boolean isElementDropped = true;
+ if (affectedWindowOp.isPresent()) {
+ TimeWindow affectedWindow = affectedWindowOp.get();
+ isElementDropped = false;
+
+ RowData acc = windowState.value(affectedWindow);
+ if (acc == null) {
+ acc = aggregator.createAccumulators();
+ }
+ aggregator.setAccumulators(affectedWindow, acc);
+
+ if (RowDataUtil.isAccumulateMsg(element)) {
+ aggregator.accumulate(element);
+ } else {
+ aggregator.retract(element);
+ }
+ acc = aggregator.getAccumulators();
+ windowState.update(affectedWindow, acc);
+ }
+
+ // the actual window which the input row is belongs to
+ Optional<TimeWindow> actualWindowOp =
+ unsliceAssigner.assignActualWindow(element, clockService,
windowFunction);
Review Comment:
Will the following case exist: affectedWindowOp is present, but
actualWindowOp is empty? or the opposite?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.processors;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
+import
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+
+/**
+ * An window aggregate processor implementation which works for {@link
UnsliceAssigner}, e.g.
+ * session windows.
+ */
+public class UnsliceWindowAggProcessor extends
AbstractWindowAggProcessor<TimeWindow>
+ implements UnslicingWindowProcessor<TimeWindow> {
+
+ private final UnsliceAssigner<TimeWindow> unsliceAssigner;
+
+ private final Trigger<TimeWindow> trigger;
+
+ //
----------------------------------------------------------------------------------------
+
+ private transient MetricGroup metrics;
+
+ protected transient MergingWindowProcessFunction<RowData, TimeWindow>
windowFunction;
+
+ private transient TriggerContextImpl triggerContext;
+
+ public UnsliceWindowAggProcessor(
+ GeneratedNamespaceAggsHandleFunction<TimeWindow> genAggsHandler,
+ UnsliceAssigner<TimeWindow> unsliceAssigner,
+ TypeSerializer<RowData> accSerializer,
+ int indexOfCountStar,
+ ZoneId shiftTimeZone) {
+ super(
+ genAggsHandler,
+ unsliceAssigner,
+ accSerializer,
+ unsliceAssigner.isEventTime(),
+ indexOfCountStar,
+ shiftTimeZone);
+ this.unsliceAssigner = unsliceAssigner;
+ if (isEventTime) {
+ trigger = EventTimeTriggers.afterEndOfWindow();
+ } else {
+ trigger = ProcessingTimeTriggers.afterEndOfWindow();
+ }
+ }
+
+ @Override
+ public void open(Context<TimeWindow> context) throws Exception {
+ super.open(context);
+ this.metrics = context.getRuntimeContext().getMetricGroup();
+ this.windowFunction =
+ new MergingWindowProcessFunction<>(
+ unsliceAssigner.getInnerMergingWindowAssigner(),
+ aggregator,
+ unsliceAssigner
+ .getInnerMergingWindowAssigner()
+ .getWindowSerializer(new ExecutionConfig()),
+ 0L);
Review Comment:
Why session window doesn't allow to set `allowedLateness`?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingTriggerContextBase.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.unslicing;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.Window;
+import
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+
+import java.time.ZoneId;
+import java.util.Collection;
+
+/**
+ * A base context for unslicing window trigger.
+ *
+ * <p>{@link UnslicingTriggerContextBase} is a utility for handling {@link
Trigger} invocations. It
+ * can be reused by setting the {@code key} and {@code window} fields. No
internal state must be
+ * kept in the {@link Trigger.TriggerContext}
+ */
+public abstract class UnslicingTriggerContextBase<K, W extends Window>
Review Comment:
I think we can refactor the `TriggerContext` class, and then reuse it.
1.
https://github.com/apache/flink/blob/27e6ac836171c5c5539ceeb234a806be661cc30a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L809
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]