beyond1920 commented on a change in pull request #15439:
URL: https://github.com/apache/flink/pull/15439#discussion_r604666200



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
##########
@@ -108,8 +140,115 @@ public StreamExecGlobalWindowAggregate(
         final TableConfig config = planner.getTableConfig();
         final SliceAssigner sliceAssigner = createSliceAssigner(windowing);
 
-        // TODO: implement translate in follow-up issues, we have to call 
createSliceAssigner and
-        //  return a non-null transformation to make plan tests passed.
-        return inputTransform;
+        final AggregateInfoList localAggInfoList =
+                AggregateUtil.deriveWindowAggregateInfoList(
+                        localAggInputRowType, // should use original input here
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        windowing.getWindow(),
+                        false); // isStateBackendDataViews
+
+        final AggregateInfoList globalAggInfoList =
+                AggregateUtil.deriveWindowAggregateInfoList(
+                        localAggInputRowType, // should use original input here
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        windowing.getWindow(),
+                        true); // isStateBackendDataViews
+
+        final GeneratedNamespaceAggsHandleFunction<Long> localAggsHandler =
+                createAggsHandler(
+                        "LocalWindowAggsHandler",
+                        sliceAssigner,
+                        localAggInfoList,
+                        grouping.length,
+                        true,
+                        localAggInfoList.getAccTypes(),
+                        config,
+                        planner.getRelBuilder());
+
+        final GeneratedNamespaceAggsHandleFunction<Long> globalAggsHandler =
+                createAggsHandler(
+                        "GlobalWindowAggsHandler",
+                        sliceAssigner,
+                        globalAggInfoList,
+                        0,
+                        true,
+                        localAggInfoList.getAccTypes(),
+                        config,
+                        planner.getRelBuilder());
+
+        final GeneratedNamespaceAggsHandleFunction<Long> stateAggsHandler =
+                createAggsHandler(
+                        "StateWindowAggsHandler",
+                        sliceAssigner,
+                        globalAggInfoList,
+                        0,
+                        false,
+                        globalAggInfoList.getAccTypes(),
+                        config,
+                        planner.getRelBuilder());
+
+        final RowDataKeySelector selector =

Review comment:
       Could we add some comment to explain `localAggHandler`, 
`globalAggHandler`, `stateAggsHandler`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
+import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+
+/**
+ * The operator used for local window aggregation.
+ *
+ * <p>Note: this only supports event-time window.
+ */
+public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+    private static final long serialVersionUID = 1L;
+    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
+
+    private final RowDataKeySelector keySelector;
+    private final SliceAssigner sliceAssigner;
+    private final long windowInterval;
+    private final WindowBuffer.Factory windowBufferFactory;
+    private final WindowCombineFunction.LocalFactory combinerFactory;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** current watermark of this operator. */
+    private transient long currentWatermark;
+
+    /** The next watermark to trigger windows. */
+    private transient long nextTriggerWatermark;
+
+    /** A buffer to buffers window data in memory and may flush to output. */
+    private transient WindowBuffer windowBuffer;
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            PagedTypeSerializer<RowData> keySer,
+            AbstractRowDataSerializer<RowData> inputSer,
+            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler) {
+        this(
+                keySelector,
+                sliceAssigner,
+                new RecordsWindowBuffer.Factory(keySer, inputSer),
+                new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer));
+    }
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            WindowBuffer.Factory windowBufferFactory,
+            WindowCombineFunction.LocalFactory combinerFactory) {
+        this.keySelector = keySelector;
+        this.sliceAssigner = sliceAssigner;
+        this.windowInterval = sliceAssigner.getSliceEndInterval();
+        this.windowBufferFactory = windowBufferFactory;
+        this.combinerFactory = combinerFactory;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        final WindowCombineFunction localCombiner =
+                combinerFactory.create(getRuntimeContext(), collector);
+        this.windowBuffer =
+                windowBufferFactory.create(
+                        getContainingTask(),
+                        
getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        localCombiner);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData inputRow = element.getValue();
+        RowData key = keySelector.getKey(inputRow);
+        long sliceEnd = sliceAssigner.assignSliceEnd(inputRow, CLOCK_SERVICE);
+        // may flush to output if buffer is full
+        windowBuffer.addElement(key, sliceEnd, inputRow);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        if (mark.getTimestamp() > currentWatermark) {
+            currentWatermark = mark.getTimestamp();
+            if (currentWatermark >= nextTriggerWatermark) {
+                // we only need to call advanceProgress() when 
currentWatermark may trigger window
+                windowBuffer.advanceProgress(currentWatermark);
+                nextTriggerWatermark = 
getNextTriggerWatermark(currentWatermark, windowInterval);
+            }
+        }
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        windowBuffer.flush();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowBuffer.close();

Review comment:
       forget to close aggregator in localCombiner?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
+import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+
+/**
+ * The operator used for local window aggregation.
+ *
+ * <p>Note: this only supports event-time window.
+ */
+public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+    private static final long serialVersionUID = 1L;
+    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
+
+    private final RowDataKeySelector keySelector;
+    private final SliceAssigner sliceAssigner;
+    private final long windowInterval;
+    private final WindowBuffer.Factory windowBufferFactory;
+    private final WindowCombineFunction.LocalFactory combinerFactory;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** current watermark of this operator. */
+    private transient long currentWatermark;
+
+    /** The next watermark to trigger windows. */
+    private transient long nextTriggerWatermark;
+
+    /** A buffer to buffers window data in memory and may flush to output. */
+    private transient WindowBuffer windowBuffer;
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            PagedTypeSerializer<RowData> keySer,
+            AbstractRowDataSerializer<RowData> inputSer,
+            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler) {
+        this(
+                keySelector,
+                sliceAssigner,
+                new RecordsWindowBuffer.Factory(keySer, inputSer),
+                new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer));
+    }
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            WindowBuffer.Factory windowBufferFactory,
+            WindowCombineFunction.LocalFactory combinerFactory) {
+        this.keySelector = keySelector;
+        this.sliceAssigner = sliceAssigner;
+        this.windowInterval = sliceAssigner.getSliceEndInterval();
+        this.windowBufferFactory = windowBufferFactory;
+        this.combinerFactory = combinerFactory;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        final WindowCombineFunction localCombiner =
+                combinerFactory.create(getRuntimeContext(), collector);
+        this.windowBuffer =
+                windowBufferFactory.create(
+                        getContainingTask(),
+                        
getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        localCombiner);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData inputRow = element.getValue();
+        RowData key = keySelector.getKey(inputRow);
+        long sliceEnd = sliceAssigner.assignSliceEnd(inputRow, CLOCK_SERVICE);
+        // may flush to output if buffer is full
+        windowBuffer.addElement(key, sliceEnd, inputRow);

Review comment:
       Currently, we buffer input record directly. We could also buffer acc 
which may cost less memory. What do you think?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
+import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+
+/**
+ * The operator used for local window aggregation.
+ *
+ * <p>Note: this only supports event-time window.
+ */
+public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+    private static final long serialVersionUID = 1L;
+    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
+
+    private final RowDataKeySelector keySelector;
+    private final SliceAssigner sliceAssigner;
+    private final long windowInterval;
+    private final WindowBuffer.Factory windowBufferFactory;
+    private final WindowCombineFunction.LocalFactory combinerFactory;
+
+    // ------------------------------------------------------------------------
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+    private transient boolean functionsClosed = false;
+
+    /** current watermark of this operator. */
+    private transient long currentWatermark;
+
+    /** The next watermark to trigger windows. */
+    private transient long nextTriggerWatermark;
+
+    /** A buffer to buffers window data in memory and may flush to output. */
+    private transient WindowBuffer windowBuffer;
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            PagedTypeSerializer<RowData> keySer,
+            AbstractRowDataSerializer<RowData> inputSer,
+            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler) {
+        this(
+                keySelector,
+                sliceAssigner,
+                new RecordsWindowBuffer.Factory(keySer, inputSer),
+                new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer));
+    }
+
+    public LocalSlicingWindowAggOperator(
+            RowDataKeySelector keySelector,
+            SliceAssigner sliceAssigner,
+            WindowBuffer.Factory windowBufferFactory,
+            WindowCombineFunction.LocalFactory combinerFactory) {
+        this.keySelector = keySelector;
+        this.sliceAssigner = sliceAssigner;
+        this.windowInterval = sliceAssigner.getSliceEndInterval();
+        this.windowBufferFactory = windowBufferFactory;
+        this.combinerFactory = combinerFactory;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        final WindowCombineFunction localCombiner =
+                combinerFactory.create(getRuntimeContext(), collector);
+        this.windowBuffer =
+                windowBufferFactory.create(
+                        getContainingTask(),
+                        
getContainingTask().getEnvironment().getMemoryManager(),
+                        computeMemorySize(),
+                        localCombiner);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData inputRow = element.getValue();
+        RowData key = keySelector.getKey(inputRow);
+        long sliceEnd = sliceAssigner.assignSliceEnd(inputRow, CLOCK_SERVICE);
+        // may flush to output if buffer is full
+        windowBuffer.addElement(key, sliceEnd, inputRow);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        if (mark.getTimestamp() > currentWatermark) {
+            currentWatermark = mark.getTimestamp();
+            if (currentWatermark >= nextTriggerWatermark) {
+                // we only need to call advanceProgress() when 
currentWatermark may trigger window
+                windowBuffer.advanceProgress(currentWatermark);
+                nextTriggerWatermark = 
getNextTriggerWatermark(currentWatermark, windowInterval);
+            }
+        }
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        windowBuffer.flush();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        windowBuffer.close();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            windowBuffer.close();

Review comment:
       forget to close aggregator in localCombiner?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.aggregate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
+
+/**
+ * An implementation of {@link WindowCombineFunction} that accumulates local 
accumulators records
+ * into the window accumulator state.
+ *
+ * <p>Note: this only supports event-time window.
+ */
+public final class GlobalAggAccCombiner implements WindowCombineFunction {
+
+    /** The service to register event-time or processing-time timers. */
+    private final InternalTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores window accumulators. */
+    private final WindowValueState<Long> accState;
+
+    /** Local aggregate function to handle local combined accumulator rows. */
+    private final NamespaceAggsHandleFunction<Long> localAggregator;
+
+    /** Global aggregate function to handle global accumulator rows. */
+    private final NamespaceAggsHandleFunction<Long> globalAggregator;
+
+    /** Whether to copy key and input record, because key and record are 
reused. */
+    private final boolean requiresCopy;
+
+    /** Serializer to copy key if required. */
+    private final TypeSerializer<RowData> keySerializer;
+
+    public GlobalAggAccCombiner(
+            InternalTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> accState,
+            NamespaceAggsHandleFunction<Long> localAggregator,
+            NamespaceAggsHandleFunction<Long> globalAggregator,
+            boolean requiresCopy,
+            TypeSerializer<RowData> keySerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.accState = accState;
+        this.localAggregator = localAggregator;
+        this.globalAggregator = globalAggregator;
+        this.requiresCopy = requiresCopy;
+        this.keySerializer = keySerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> localAccs) 
throws Exception {
+        // step 0: set current key for states and timers
+        final RowData key;
+        if (requiresCopy) {
+            // the incoming key is reused, we should copy it if state backend 
doesn't copy it
+            key = keySerializer.copy(windowKey.getKey());
+        } else {
+            key = windowKey.getKey();
+        }
+        keyContext.setCurrentKey(key);
+        Long window = windowKey.getWindow();
+
+        // step 1: merge localAccs into one acc
+        RowData acc = localAggregator.createAccumulators();
+        localAggregator.setAccumulators(window, acc);
+        while (localAccs.hasNext()) {
+            RowData localAcc = localAccs.next();
+            localAggregator.merge(window, localAcc);
+        }
+        RowData mergedLocalAcc = localAggregator.getAccumulators();
+
+        // step2: merge acc into state
+        RowData stateAcc = accState.value(window);
+        if (stateAcc == null) {
+            stateAcc = globalAggregator.createAccumulators();
+        }
+        globalAggregator.setAccumulators(window, stateAcc);
+        globalAggregator.merge(window, mergedLocalAcc);
+        stateAcc = globalAggregator.getAccumulators();
+        accState.update(window, stateAcc);
+
+        // step 3: register timer for current window
+        timerService.registerEventTimeTimer(window, window - 1);
+    }
+
+    @Override
+    public void close() throws Exception {
+        localAggregator.close();
+        globalAggregator.close();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Factory
+    // 
----------------------------------------------------------------------------------------
+
+    /** Factory to create {@link GlobalAggAccCombiner}. */
+    public static final class Factory implements WindowCombineFunction.Factory 
{
+
+        private static final long serialVersionUID = 1L;
+
+        private final GeneratedNamespaceAggsHandleFunction<Long> 
genLocalAggsHandler;
+        private final GeneratedNamespaceAggsHandleFunction<Long> 
genGlobalAggsHandler;
+        private final TypeSerializer<RowData> keySerializer;
+        private final TypeSerializer<RowData> recordSerializer;

Review comment:
       Remove unused field?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to