This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 6a0d44d  [FLINK-22304][table] Refactor some interfaces for TVF based 
window to improve the extendability
6a0d44d is described below

commit 6a0d44d1ffcfa220def7de3ca2130f8cfcf64fe4
Author: shuo.cs <[email protected]>
AuthorDate: Thu Apr 15 14:31:43 2021 +0800

    [FLINK-22304][table] Refactor some interfaces for TVF based window to 
improve the extendability
    
    This closes #15745
---
 .../stream/StreamExecLocalWindowAggregate.java     | 20 ++++--
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  2 +-
 .../agg/batch/HashWindowCodeGenerator.scala        |  2 +-
 .../window/LocalSlicingWindowAggOperator.java      | 33 ++-------
 .../window/SlicingWindowAggOperatorBuilder.java    | 37 +++++-----
 .../window/buffers/RecordsWindowBuffer.java        | 84 +++++++++++++++++++---
 .../aggregate/window/buffers/WindowBuffer.java     | 46 ++++++++++--
 .../{AggRecordsCombiner.java => AggCombiner.java}  | 60 +++-------------
 ...lAggAccCombiner.java => GlobalAggCombiner.java} | 63 ++++++----------
 ...gRecordsCombiner.java => LocalAggCombiner.java} | 35 ++++-----
 .../processors/AbstractWindowAggProcessor.java     | 17 ++---
 .../processors/SliceSharedWindowAggProcessor.java  | 10 +--
 .../SliceUnsharedWindowAggProcessor.java           | 10 +--
 .../rank/window/WindowRankOperatorBuilder.java     | 15 ++--
 .../rank/window/combines/TopNRecordsCombiner.java  | 38 ++--------
 .../window/processors/WindowRankProcessor.java     | 17 ++---
 ...owCombineFunction.java => RecordsCombiner.java} | 25 +++----
 .../collections/binary/AbstractBytesHashMap.java   | 16 +++--
 .../collections/binary/AbstractBytesMultiMap.java  | 25 ++++---
 .../aggregate/SumHashAggTestOperator.java          |  3 +-
 .../collections/binary/BytesHashMapTestBase.java   |  2 +-
 .../collections/binary/BytesMultiMapTestBase.java  |  2 +-
 22 files changed, 260 insertions(+), 302 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 18f8a8d..e58e7f1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -38,7 +38,11 @@ import 
org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -65,7 +69,6 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
     private static final long WINDOW_AGG_MEMORY_RATIO = 100;
 
     public static final String FIELD_NAME_WINDOWING = "windowing";
-    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = 
"namedWindowProperties";
 
     @JsonProperty(FIELD_NAME_GROUPING)
     private final int[] grouping;
@@ -139,14 +142,17 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
         final RowDataKeySelector selector =
                 KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
 
+        PagedTypeSerializer<RowData> keySer =
+                (PagedTypeSerializer<RowData>) 
selector.getProducedType().toSerializer();
+        AbstractRowDataSerializer<RowData> valueSer = new 
RowDataSerializer(inputRowType);
+
+        WindowBuffer.LocalFactory bufferFactory =
+                new RecordsWindowBuffer.LocalFactory(
+                        keySer, valueSer, new 
LocalAggCombiner.Factory(generatedAggsHandler));
+
         final OneInputStreamOperator<RowData, RowData> localAggOperator =
                 new LocalSlicingWindowAggOperator(
-                        selector,
-                        sliceAssigner,
-                        (PagedTypeSerializer<RowData>) 
selector.getProducedType().toSerializer(),
-                        new RowDataSerializer(inputRowType),
-                        generatedAggsHandler,
-                        shiftTimeZone);
+                        selector, sliceAssigner, bufferFactory, shiftTimeZone);
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 3c01f01..dc76352 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -532,7 +532,7 @@ object HashAggCodeGenHelper {
     val rowDataType = classOf[RowData].getCanonicalName
     s"""
        |$iteratorType<$rowDataType, $rowDataType> $iteratorTerm =
-       |  $aggregateMapTerm.getEntryIterator();
+       |  $aggregateMapTerm.getEntryIterator(false); // reuse key/value during 
iterating
        |while ($iteratorTerm.advanceNext()) {
        |   // set result and output
        |   $reuseGroupKeyTerm = ($rowDataType)$iteratorTerm.getKey();
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
index 8384861..864e71b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -716,7 +716,7 @@ class HashWindowCodeGenerator(
     val iteratorTerm = CodeGenUtils.newName("iterator")
     s"""
        |$iteratorType<$rowDataType, $rowDataType> $iteratorTerm =
-       |  $aggregateMapTerm.getEntryIterator();
+       |  $aggregateMapTerm.getEntryIterator(false); // reuse key/value during 
iterating
        |while ($iteratorTerm.advanceNext()) {
        |   $reuseAggMapKeyTerm = ($rowDataType) $iteratorTerm.getKey();
        |   $reuseAggBufferTerm = ($rowDataType) $iteratorTerm.getValue();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 6634f77..b044db7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -26,16 +26,10 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
-import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
 import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
-import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
 
 import java.time.ZoneId;
 import java.util.TimeZone;
@@ -55,8 +49,7 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
     private final RowDataKeySelector keySelector;
     private final SliceAssigner sliceAssigner;
     private final long windowInterval;
-    private final WindowBuffer.Factory windowBufferFactory;
-    private final WindowCombineFunction.LocalFactory combinerFactory;
+    private final WindowBuffer.LocalFactory windowBufferFactory;
 
     /**
      * The shift timezone of the window, if the proctime or rowtime type is 
TIMESTAMP_LTZ, the shift
@@ -88,29 +81,12 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
     public LocalSlicingWindowAggOperator(
             RowDataKeySelector keySelector,
             SliceAssigner sliceAssigner,
-            PagedTypeSerializer<RowData> keySer,
-            AbstractRowDataSerializer<RowData> inputSer,
-            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
-            ZoneId shiftTimezone) {
-        this(
-                keySelector,
-                sliceAssigner,
-                new RecordsWindowBuffer.Factory(keySer, inputSer),
-                new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer),
-                shiftTimezone);
-    }
-
-    public LocalSlicingWindowAggOperator(
-            RowDataKeySelector keySelector,
-            SliceAssigner sliceAssigner,
-            WindowBuffer.Factory windowBufferFactory,
-            WindowCombineFunction.LocalFactory combinerFactory,
+            WindowBuffer.LocalFactory windowBufferFactory,
             ZoneId shiftTimezone) {
         this.keySelector = keySelector;
         this.sliceAssigner = sliceAssigner;
         this.windowInterval = sliceAssigner.getSliceEndInterval();
         this.windowBufferFactory = windowBufferFactory;
-        this.combinerFactory = combinerFactory;
         this.shiftTimezone = shiftTimezone;
         this.useDayLightSaving = 
TimeZone.getTimeZone(shiftTimezone).useDaylightTime();
     }
@@ -123,14 +99,13 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
         collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
 
-        final WindowCombineFunction localCombiner =
-                combinerFactory.create(getRuntimeContext(), collector);
         this.windowBuffer =
                 windowBufferFactory.create(
                         getContainingTask(),
                         
getContainingTask().getEnvironment().getMemoryManager(),
                         computeMemorySize(),
-                        localCombiner,
+                        getRuntimeContext(),
+                        collector,
                         shiftTimezone);
     }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
index 41b0c0c..3aba162 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.table.runtime.operators.aggregate.window;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.AggRecordsCombiner;
-import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggAccCombiner;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.AggCombiner;
+import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggCombiner;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceUnsharedWindowAggProcessor;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners.HoppingSliceAssigner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -64,7 +63,7 @@ public class SlicingWindowAggOperatorBuilder {
     private SliceAssigner assigner;
     private AbstractRowDataSerializer<RowData> inputSerializer;
     private PagedTypeSerializer<RowData> keySerializer;
-    private TypeSerializer<RowData> accSerializer;
+    private AbstractRowDataSerializer<RowData> accSerializer;
     private GeneratedNamespaceAggsHandleFunction<Long> 
generatedAggregateFunction;
     private GeneratedNamespaceAggsHandleFunction<Long> 
localGeneratedAggregateFunction;
     private GeneratedNamespaceAggsHandleFunction<Long> 
globalGeneratedAggregateFunction;
@@ -95,7 +94,7 @@ public class SlicingWindowAggOperatorBuilder {
 
     public SlicingWindowAggOperatorBuilder aggregate(
             GeneratedNamespaceAggsHandleFunction<Long> 
generatedAggregateFunction,
-            TypeSerializer<RowData> accSerializer) {
+            AbstractRowDataSerializer<RowData> accSerializer) {
         this.generatedAggregateFunction = generatedAggregateFunction;
         this.accSerializer = accSerializer;
         return this;
@@ -105,7 +104,7 @@ public class SlicingWindowAggOperatorBuilder {
             GeneratedNamespaceAggsHandleFunction<Long> 
localGeneratedAggregateFunction,
             GeneratedNamespaceAggsHandleFunction<Long> 
globalGeneratedAggregateFunction,
             GeneratedNamespaceAggsHandleFunction<Long> 
stateGeneratedAggregateFunction,
-            TypeSerializer<RowData> accSerializer) {
+            AbstractRowDataSerializer<RowData> accSerializer) {
         this.localGeneratedAggregateFunction = localGeneratedAggregateFunction;
         this.globalGeneratedAggregateFunction = 
globalGeneratedAggregateFunction;
         this.generatedAggregateFunction = stateGeneratedAggregateFunction;
@@ -131,20 +130,20 @@ public class SlicingWindowAggOperatorBuilder {
         checkNotNull(keySerializer);
         checkNotNull(accSerializer);
         checkNotNull(generatedAggregateFunction);
-        final WindowBuffer.Factory bufferFactory =
-                new RecordsWindowBuffer.Factory(keySerializer, 
inputSerializer);
-        final WindowCombineFunction.Factory combinerFactory;
-        if (localGeneratedAggregateFunction != null && 
globalGeneratedAggregateFunction != null) {
+
+        boolean isGlobalAgg =
+                localGeneratedAggregateFunction != null && 
globalGeneratedAggregateFunction != null;
+
+        RecordsCombiner.Factory combinerFactory;
+        if (isGlobalAgg) {
             combinerFactory =
-                    new GlobalAggAccCombiner.Factory(
-                            localGeneratedAggregateFunction,
-                            globalGeneratedAggregateFunction,
-                            keySerializer);
+                    new GlobalAggCombiner.Factory(
+                            localGeneratedAggregateFunction, 
globalGeneratedAggregateFunction);
         } else {
-            combinerFactory =
-                    new AggRecordsCombiner.Factory(
-                            generatedAggregateFunction, keySerializer, 
inputSerializer);
+            combinerFactory = new 
AggCombiner.Factory(generatedAggregateFunction);
         }
+        final WindowBuffer.Factory bufferFactory =
+                new RecordsWindowBuffer.Factory(keySerializer, 
inputSerializer, combinerFactory);
 
         final SlicingWindowProcessor<Long> windowProcessor;
         if (assigner instanceof SliceSharedAssigner) {
@@ -152,7 +151,6 @@ public class SlicingWindowAggOperatorBuilder {
                     new SliceSharedWindowAggProcessor(
                             generatedAggregateFunction,
                             bufferFactory,
-                            combinerFactory,
                             (SliceSharedAssigner) assigner,
                             accSerializer,
                             indexOfCountStart,
@@ -162,7 +160,6 @@ public class SlicingWindowAggOperatorBuilder {
                     new SliceUnsharedWindowAggProcessor(
                             generatedAggregateFunction,
                             bufferFactory,
-                            combinerFactory,
                             (SliceUnsharedAssigner) assigner,
                             accSerializer,
                             shiftTimeZone);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
index 7f6e53d..9ca57f4 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.data.RowData;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
 import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
 import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
@@ -28,11 +32,13 @@ import org.apache.flink.table.runtime.util.KeyValueIterator;
 import org.apache.flink.table.runtime.util.WindowKey;
 import 
org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
 import 
org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.util.Collector;
 
 import java.io.EOFException;
 import java.time.ZoneId;
 import java.util.Iterator;
 
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /**
@@ -41,11 +47,14 @@ import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
  */
 public final class RecordsWindowBuffer implements WindowBuffer {
 
-    private final WindowCombineFunction combineFunction;
+    private final RecordsCombiner combineFunction;
     private final WindowBytesMultiMap recordsBuffer;
     private final WindowKey reuseWindowKey;
     private final AbstractRowDataSerializer<RowData> recordSerializer;
     private final ZoneId shiftTimeZone;
+    // copy key and input record if necessary(e.g., heap state backend),
+    // because key and record are reused.
+    private final boolean requiresCopy;
 
     private long minSliceEnd = Long.MAX_VALUE;
 
@@ -53,9 +62,10 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
             Object operatorOwner,
             MemoryManager memoryManager,
             long memorySize,
-            WindowCombineFunction combineFunction,
+            RecordsCombiner combineFunction,
             PagedTypeSerializer<RowData> keySer,
             AbstractRowDataSerializer<RowData> inputSer,
+            boolean requiresCopy,
             ZoneId shiftTimeZone) {
         this.combineFunction = combineFunction;
         this.recordsBuffer =
@@ -63,6 +73,7 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
                         operatorOwner, memoryManager, memorySize, keySer, 
inputSer.getArity());
         this.recordSerializer = inputSer;
         this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
+        this.requiresCopy = requiresCopy;
         this.shiftTimeZone = shiftTimeZone;
     }
 
@@ -97,7 +108,7 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
     public void flush() throws Exception {
         if (recordsBuffer.getNumKeys() > 0) {
             KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator =
-                    recordsBuffer.getEntryIterator();
+                    recordsBuffer.getEntryIterator(requiresCopy);
             while (entryIterator.advanceNext()) {
                 combineFunction.combine(entryIterator.getKey(), 
entryIterator.getValue());
             }
@@ -117,18 +128,22 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
     // Factory
     // 
------------------------------------------------------------------------------------------
 
-    /** Factory to create {@link RecordsWindowBuffer}. */
+    /** Factory to create {@link RecordsWindowBuffer} with {@link 
RecordsCombiner.Factory}. */
     public static final class Factory implements WindowBuffer.Factory {
 
         private static final long serialVersionUID = 1L;
 
         private final PagedTypeSerializer<RowData> keySer;
         private final AbstractRowDataSerializer<RowData> inputSer;
+        private final RecordsCombiner.Factory factory;
 
         public Factory(
-                PagedTypeSerializer<RowData> keySer, 
AbstractRowDataSerializer<RowData> inputSer) {
+                PagedTypeSerializer<RowData> keySer,
+                AbstractRowDataSerializer<RowData> inputSer,
+                RecordsCombiner.Factory combinerFactory) {
             this.keySer = keySer;
             this.inputSer = inputSer;
+            this.factory = combinerFactory;
         }
 
         @Override
@@ -136,15 +151,66 @@ public final class RecordsWindowBuffer implements 
WindowBuffer {
                 Object operatorOwner,
                 MemoryManager memoryManager,
                 long memorySize,
-                WindowCombineFunction combineFunction,
-                ZoneId shiftTimeZone) {
+                RuntimeContext runtimeContext,
+                WindowTimerService<Long> timerService,
+                KeyedStateBackend<RowData> stateBackend,
+                WindowState<Long> windowState,
+                boolean isEventTime,
+                ZoneId shiftTimeZone)
+                throws Exception {
+            RecordsCombiner combiner =
+                    factory.createRecordsCombiner(
+                            runtimeContext, timerService, stateBackend, 
windowState, isEventTime);
+            boolean requiresCopy = 
!isStateImmutableInStateBackend(stateBackend);
             return new RecordsWindowBuffer(
                     operatorOwner,
                     memoryManager,
                     memorySize,
-                    combineFunction,
+                    combiner,
                     keySer,
                     inputSer,
+                    requiresCopy,
+                    shiftTimeZone);
+        }
+    }
+
+    /** Factory to create {@link RecordsWindowBuffer} with {@link 
RecordsCombiner.LocalFactory}. */
+    public static final class LocalFactory implements 
WindowBuffer.LocalFactory {
+
+        private static final long serialVersionUID = 1L;
+
+        private final PagedTypeSerializer<RowData> keySer;
+        private final AbstractRowDataSerializer<RowData> inputSer;
+        private final RecordsCombiner.LocalFactory localFactory;
+
+        public LocalFactory(
+                PagedTypeSerializer<RowData> keySer,
+                AbstractRowDataSerializer<RowData> inputSer,
+                RecordsCombiner.LocalFactory localFactory) {
+            this.keySer = keySer;
+            this.inputSer = inputSer;
+            this.localFactory = localFactory;
+        }
+
+        @Override
+        public WindowBuffer create(
+                Object operatorOwner,
+                MemoryManager memoryManager,
+                long memorySize,
+                RuntimeContext runtimeContext,
+                Collector<RowData> collector,
+                ZoneId shiftTimeZone)
+                throws Exception {
+            RecordsCombiner combiner =
+                    localFactory.createRecordsCombiner(runtimeContext, 
collector);
+            return new RecordsWindowBuffer(
+                    operatorOwner,
+                    memoryManager,
+                    memorySize,
+                    combiner,
+                    keySer,
+                    inputSer,
+                    false,
                     shiftTimeZone);
         }
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
index 7b954f8..4f86659 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
@@ -19,9 +19,13 @@
 package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.data.RowData;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.util.Collector;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -83,7 +87,12 @@ public interface WindowBuffer {
          * @param operatorOwner the owner of the operator
          * @param memoryManager the manager that governs memory by Flink 
framework
          * @param memorySize the managed memory size can be used by this 
operator
-         * @param combineFunction the combine function used to combine 
buffered data into state
+         * @param runtimeContext the current {@link RuntimeContext}
+         * @param timerService the service to register event-time and 
processing-time timers
+         * @param stateBackend the state backend to accessing states
+         * @param windowState the window state to flush buffered data into.
+         * @param isEventTime indicates whether the operator works in 
event-time or processing-time
+         *     mode, used for register corresponding timers.
          * @param shiftTimeZone the shit timezone of the window
          * @throws IOException thrown if the buffer can't be opened
          */
@@ -91,8 +100,37 @@ public interface WindowBuffer {
                 Object operatorOwner,
                 MemoryManager memoryManager,
                 long memorySize,
-                WindowCombineFunction combineFunction,
+                RuntimeContext runtimeContext,
+                WindowTimerService<Long> timerService,
+                KeyedStateBackend<RowData> stateBackend,
+                WindowState<Long> windowState,
+                boolean isEventTime,
                 ZoneId shiftTimeZone)
-                throws IOException;
+                throws Exception;
+    }
+
+    /** A factory that creates a {@link WindowBuffer}. */
+    @FunctionalInterface
+    interface LocalFactory extends Serializable {
+
+        /**
+         * Creates a {@link WindowBuffer} for local window that buffers 
elements in memory before
+         * flushing.
+         *
+         * @param operatorOwner the owner of the operator
+         * @param memoryManager the manager that governs memory by Flink 
framework
+         * @param memorySize the managed memory size can be used by this 
operator
+         * @param collector collector to emit records
+         * @param shiftTimeZone the shit timezone of the window
+         * @throws IOException thrown if the buffer can't be opened
+         */
+        WindowBuffer create(
+                Object operatorOwner,
+                MemoryManager memoryManager,
+                long memorySize,
+                RuntimeContext runtimeContext,
+                Collector<RowData> collector,
+                ZoneId shiftTimeZone)
+                throws Exception;
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
similarity index 71%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
index 53b9372..1ec2bc1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
@@ -19,14 +19,13 @@
 package org.apache.flink.table.runtime.operators.aggregate.window.combines;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
 import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
 import org.apache.flink.table.runtime.operators.window.state.WindowState;
@@ -37,14 +36,13 @@ import java.time.ZoneId;
 import java.util.Iterator;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /**
- * An implementation of {@link WindowCombineFunction} that accumulates input 
records into the window
+ * An implementation of {@link RecordsCombiner} that accumulates input records 
into the window
  * accumulator state.
  */
-public final class AggRecordsCombiner implements WindowCombineFunction {
+public class AggCombiner implements RecordsCombiner {
 
     /** The service to register event-time or processing-time timers. */
     private final WindowTimerService<Long> timerService;
@@ -58,48 +56,26 @@ public final class AggRecordsCombiner implements 
WindowCombineFunction {
     /** Function used to handle all aggregates. */
     private final NamespaceAggsHandleFunction<Long> aggregator;
 
-    /** Whether to copy key and input record, because key and record are 
reused. */
-    private final boolean requiresCopy;
-
-    /** Serializer to copy key if required. */
-    private final TypeSerializer<RowData> keySerializer;
-
-    /** Serializer to copy record if required. */
-    private final TypeSerializer<RowData> recordSerializer;
-
     /** Whether the operator works in event-time mode, used to indicate 
registering which timer. */
     private final boolean isEventTime;
 
-    public AggRecordsCombiner(
+    public AggCombiner(
             WindowTimerService<Long> timerService,
             StateKeyContext keyContext,
             WindowValueState<Long> accState,
             NamespaceAggsHandleFunction<Long> aggregator,
-            boolean requiresCopy,
-            TypeSerializer<RowData> keySerializer,
-            TypeSerializer<RowData> recordSerializer,
             boolean isEventTime) {
         this.timerService = timerService;
         this.keyContext = keyContext;
         this.accState = accState;
         this.aggregator = aggregator;
-        this.requiresCopy = requiresCopy;
-        this.keySerializer = keySerializer;
-        this.recordSerializer = recordSerializer;
         this.isEventTime = isEventTime;
     }
 
     @Override
     public void combine(WindowKey windowKey, Iterator<RowData> records) throws 
Exception {
         // step 0: set current key for states and timers
-        final RowData key;
-        if (requiresCopy) {
-            // the incoming key is reused, we should copy it if state backend 
doesn't copy it
-            key = keySerializer.copy(windowKey.getKey());
-        } else {
-            key = windowKey.getKey();
-        }
-        keyContext.setCurrentKey(key);
+        keyContext.setCurrentKey(windowKey.getKey());
 
         // step 1: get the accumulator for the current key and window
         Long window = windowKey.getWindow();
@@ -114,10 +90,6 @@ public final class AggRecordsCombiner implements 
WindowCombineFunction {
         // step 3: do accumulate
         while (records.hasNext()) {
             RowData record = records.next();
-            if (requiresCopy) {
-                // the incoming record is reused, we should copy it if state 
backend doesn't copy it
-                record = recordSerializer.copy(record);
-            }
             if (isAccumulateMsg(record)) {
                 aggregator.accumulate(record);
             } else {
@@ -151,26 +123,18 @@ public final class AggRecordsCombiner implements 
WindowCombineFunction {
     // Factory
     // 
----------------------------------------------------------------------------------------
 
-    /** Factory to create {@link AggRecordsCombiner}. */
-    public static final class Factory implements WindowCombineFunction.Factory 
{
-
+    /** Factory to create {@link AggCombiner}. */
+    public static final class Factory implements RecordsCombiner.Factory {
         private static final long serialVersionUID = 1L;
 
         private final GeneratedNamespaceAggsHandleFunction<Long> 
genAggsHandler;
-        private final TypeSerializer<RowData> keySerializer;
-        private final TypeSerializer<RowData> recordSerializer;
 
-        public Factory(
-                GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
-                TypeSerializer<RowData> keySerializer,
-                TypeSerializer<RowData> recordSerializer) {
+        public Factory(GeneratedNamespaceAggsHandleFunction<Long> 
genAggsHandler) {
             this.genAggsHandler = genAggsHandler;
-            this.keySerializer = keySerializer;
-            this.recordSerializer = recordSerializer;
         }
 
         @Override
-        public WindowCombineFunction create(
+        public RecordsCombiner createRecordsCombiner(
                 RuntimeContext runtimeContext,
                 WindowTimerService<Long> timerService,
                 KeyedStateBackend<RowData> stateBackend,
@@ -182,16 +146,12 @@ public final class AggRecordsCombiner implements 
WindowCombineFunction {
             aggregator.open(
                     new PerWindowStateDataViewStore(
                             stateBackend, LongSerializer.INSTANCE, 
runtimeContext));
-            boolean requiresCopy = 
!isStateImmutableInStateBackend(stateBackend);
             WindowValueState<Long> windowValueState = (WindowValueState<Long>) 
windowState;
-            return new AggRecordsCombiner(
+            return new AggCombiner(
                     timerService,
                     stateBackend::setCurrentKey,
                     windowValueState,
                     aggregator,
-                    requiresCopy,
-                    keySerializer,
-                    recordSerializer,
                     isEventTime);
         }
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
similarity index 75%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
index e8fc66a..bf6e300 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
@@ -19,14 +19,13 @@
 package org.apache.flink.table.runtime.operators.aggregate.window.combines;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
 import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
 import org.apache.flink.table.runtime.operators.window.state.WindowState;
@@ -36,16 +35,15 @@ import org.apache.flink.table.runtime.util.WindowKey;
 import java.time.ZoneId;
 import java.util.Iterator;
 
-import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /**
- * An implementation of {@link WindowCombineFunction} that accumulates local 
accumulators records
- * into the window accumulator state.
+ * An implementation of {@link RecordsCombiner} that accumulates local 
accumulators records into the
+ * window accumulator state.
  *
  * <p>Note: this only supports event-time window.
  */
-public final class GlobalAggAccCombiner implements WindowCombineFunction {
+public class GlobalAggCombiner implements RecordsCombiner {
 
     /** The service to register event-time or processing-time timers. */
     private final WindowTimerService<Long> timerService;
@@ -62,50 +60,35 @@ public final class GlobalAggAccCombiner implements 
WindowCombineFunction {
     /** Global aggregate function to handle global accumulator rows. */
     private final NamespaceAggsHandleFunction<Long> globalAggregator;
 
-    /** Whether to copy key and input record, because key and record are 
reused. */
-    private final boolean requiresCopy;
-
-    /** Serializer to copy key if required. */
-    private final TypeSerializer<RowData> keySerializer;
-
-    public GlobalAggAccCombiner(
+    public GlobalAggCombiner(
             WindowTimerService<Long> timerService,
             StateKeyContext keyContext,
             WindowValueState<Long> accState,
             NamespaceAggsHandleFunction<Long> localAggregator,
-            NamespaceAggsHandleFunction<Long> globalAggregator,
-            boolean requiresCopy,
-            TypeSerializer<RowData> keySerializer) {
+            NamespaceAggsHandleFunction<Long> globalAggregator) {
         this.timerService = timerService;
         this.keyContext = keyContext;
         this.accState = accState;
         this.localAggregator = localAggregator;
         this.globalAggregator = globalAggregator;
-        this.requiresCopy = requiresCopy;
-        this.keySerializer = keySerializer;
     }
 
     @Override
     public void combine(WindowKey windowKey, Iterator<RowData> localAccs) 
throws Exception {
-        // step 0: set current key for states and timers
-        final RowData key;
-        if (requiresCopy) {
-            // the incoming key is reused, we should copy it if state backend 
doesn't copy it
-            key = keySerializer.copy(windowKey.getKey());
-        } else {
-            key = windowKey.getKey();
-        }
-        keyContext.setCurrentKey(key);
         Long window = windowKey.getWindow();
-
-        // step 1: merge localAccs into one acc
         RowData acc = localAggregator.createAccumulators();
         localAggregator.setAccumulators(window, acc);
         while (localAccs.hasNext()) {
             RowData localAcc = localAccs.next();
             localAggregator.merge(window, localAcc);
         }
-        RowData mergedLocalAcc = localAggregator.getAccumulators();
+        combineAccumulator(windowKey, localAggregator.getAccumulators());
+    }
+
+    private void combineAccumulator(WindowKey windowKey, RowData acc) throws 
Exception {
+        // step 1: set current key for states and timers
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
 
         // step2: merge acc into state
         RowData stateAcc = accState.value(window);
@@ -113,7 +96,7 @@ public final class GlobalAggAccCombiner implements 
WindowCombineFunction {
             stateAcc = globalAggregator.createAccumulators();
         }
         globalAggregator.setAccumulators(window, stateAcc);
-        globalAggregator.merge(window, mergedLocalAcc);
+        globalAggregator.merge(window, acc);
         stateAcc = globalAggregator.getAccumulators();
         accState.update(window, stateAcc);
 
@@ -136,26 +119,23 @@ public final class GlobalAggAccCombiner implements 
WindowCombineFunction {
     // Factory
     // 
----------------------------------------------------------------------------------------
 
-    /** Factory to create {@link GlobalAggAccCombiner}. */
-    public static final class Factory implements WindowCombineFunction.Factory 
{
+    /** Factory to create {@link GlobalAggCombiner}. */
+    public static final class Factory implements RecordsCombiner.Factory {
 
         private static final long serialVersionUID = 1L;
 
         private final GeneratedNamespaceAggsHandleFunction<Long> 
genLocalAggsHandler;
         private final GeneratedNamespaceAggsHandleFunction<Long> 
genGlobalAggsHandler;
-        private final TypeSerializer<RowData> keySerializer;
 
         public Factory(
                 GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler,
-                GeneratedNamespaceAggsHandleFunction<Long> 
genGlobalAggsHandler,
-                TypeSerializer<RowData> keySerializer) {
+                GeneratedNamespaceAggsHandleFunction<Long> 
genGlobalAggsHandler) {
             this.genLocalAggsHandler = genLocalAggsHandler;
             this.genGlobalAggsHandler = genGlobalAggsHandler;
-            this.keySerializer = keySerializer;
         }
 
         @Override
-        public WindowCombineFunction create(
+        public RecordsCombiner createRecordsCombiner(
                 RuntimeContext runtimeContext,
                 WindowTimerService<Long> timerService,
                 KeyedStateBackend<RowData> stateBackend,
@@ -172,16 +152,13 @@ public final class GlobalAggAccCombiner implements 
WindowCombineFunction {
             globalAggregator.open(
                     new PerWindowStateDataViewStore(
                             stateBackend, LongSerializer.INSTANCE, 
runtimeContext));
-            boolean requiresCopy = 
!isStateImmutableInStateBackend(stateBackend);
             WindowValueState<Long> windowValueState = (WindowValueState<Long>) 
windowState;
-            return new GlobalAggAccCombiner(
+            return new GlobalAggCombiner(
                     timerService,
                     stateBackend::setCurrentKey,
                     windowValueState,
                     localAggregator,
-                    globalAggregator,
-                    requiresCopy,
-                    keySerializer);
+                    globalAggregator);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
similarity index 77%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
index 71e92d8..c2c8807 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
@@ -19,14 +19,13 @@
 package org.apache.flink.table.runtime.operators.aggregate.window.combines;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.dataview.UnsupportedStateDataViewStore;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import org.apache.flink.table.runtime.util.WindowKey;
 import org.apache.flink.util.Collector;
 
@@ -35,19 +34,16 @@ import java.util.Iterator;
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
 
 /**
- * An implementation of {@link WindowCombineFunction} that accumulates input 
records into local
+ * An implementation of {@link RecordsCombiner} that accumulates input records 
into local
  * accumulators.
  *
  * <p>Note: this only supports event-time window.
  */
-public final class LocalAggRecordsCombiner implements WindowCombineFunction {
+public class LocalAggCombiner implements RecordsCombiner {
 
     /** Function used to handle all aggregates. */
     private final NamespaceAggsHandleFunction<Long> aggregator;
 
-    /** Serializer to copy key if required. */
-    private final TypeSerializer<RowData> keySerializer;
-
     /** The output to emit combined accumulator result. */
     private final Collector<RowData> collector;
 
@@ -63,19 +59,16 @@ public final class LocalAggRecordsCombiner implements 
WindowCombineFunction {
      */
     private final GenericRowData windowRow = new GenericRowData(1);
 
-    public LocalAggRecordsCombiner(
-            NamespaceAggsHandleFunction<Long> aggregator,
-            TypeSerializer<RowData> keySerializer,
-            Collector<RowData> collector) {
+    public LocalAggCombiner(
+            NamespaceAggsHandleFunction<Long> aggregator, Collector<RowData> 
collector) {
         this.aggregator = aggregator;
-        this.keySerializer = keySerializer;
         this.collector = collector;
     }
 
     @Override
     public void combine(WindowKey windowKey, Iterator<RowData> records) throws 
Exception {
-        // always copy key because we will merge record into memory acc
-        final RowData key = keySerializer.copy(windowKey.getKey());
+        // always not copy key/value because they are not cached.
+        final RowData key = windowKey.getKey();
         final Long window = windowKey.getWindow();
 
         // step 1: create an empty accumulator
@@ -116,28 +109,24 @@ public final class LocalAggRecordsCombiner implements 
WindowCombineFunction {
     // Factory
     // 
----------------------------------------------------------------------------------------
 
-    /** Factory to create {@link LocalAggRecordsCombiner}. */
-    public static final class Factory implements 
WindowCombineFunction.LocalFactory {
+    /** Factory to create {@link LocalAggCombiner}. */
+    public static final class Factory implements RecordsCombiner.LocalFactory {
 
         private static final long serialVersionUID = 1L;
 
         private final GeneratedNamespaceAggsHandleFunction<Long> 
genAggsHandler;
-        private final TypeSerializer<RowData> keySerializer;
 
-        public Factory(
-                GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
-                TypeSerializer<RowData> keySerializer) {
+        public Factory(GeneratedNamespaceAggsHandleFunction<Long> 
genAggsHandler) {
             this.genAggsHandler = genAggsHandler;
-            this.keySerializer = keySerializer;
         }
 
         @Override
-        public WindowCombineFunction create(
+        public RecordsCombiner createRecordsCombiner(
                 RuntimeContext runtimeContext, Collector<RowData> collector) 
throws Exception {
             final NamespaceAggsHandleFunction<Long> aggregator =
                     
genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
             aggregator.open(new UnsupportedStateDataViewStore(runtimeContext));
-            return new LocalAggRecordsCombiner(aggregator, keySerializer, 
collector);
+            return new LocalAggCombiner(aggregator, collector);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
index 85f0ebb..0851949 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
 import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -50,7 +49,6 @@ public abstract class AbstractWindowAggProcessor implements 
SlicingWindowProcess
 
     protected final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;
     protected final WindowBuffer.Factory windowBufferFactory;
-    protected final WindowCombineFunction.Factory combineFactory;
     protected final SliceAssigner sliceAssigner;
     protected final TypeSerializer<RowData> accSerializer;
     protected final boolean isEventTime;
@@ -85,13 +83,11 @@ public abstract class AbstractWindowAggProcessor implements 
SlicingWindowProcess
     public AbstractWindowAggProcessor(
             GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
             WindowBuffer.Factory bufferFactory,
-            WindowCombineFunction.Factory combinerFactory,
             SliceAssigner sliceAssigner,
             TypeSerializer<RowData> accSerializer,
             ZoneId shiftTimeZone) {
         this.genAggsHandler = genAggsHandler;
         this.windowBufferFactory = bufferFactory;
-        this.combineFactory = combinerFactory;
         this.sliceAssigner = sliceAssigner;
         this.accSerializer = accSerializer;
         this.isEventTime = sliceAssigner.isEventTime();
@@ -118,19 +114,16 @@ public abstract class AbstractWindowAggProcessor 
implements SlicingWindowProcess
         this.aggregator.open(
                 new PerWindowStateDataViewStore(
                         ctx.getKeyedStateBackend(), namespaceSerializer, 
ctx.getRuntimeContext()));
-        final WindowCombineFunction combineFunction =
-                combineFactory.create(
-                        ctx.getRuntimeContext(),
-                        windowTimerService,
-                        ctx.getKeyedStateBackend(),
-                        windowState,
-                        isEventTime);
         this.windowBuffer =
                 windowBufferFactory.create(
                         ctx.getOperatorOwner(),
                         ctx.getMemoryManager(),
                         ctx.getMemorySize(),
-                        combineFunction,
+                        ctx.getRuntimeContext(),
+                        windowTimerService,
+                        ctx.getKeyedStateBackend(),
+                        windowState,
+                        isEventTime,
                         shiftTimeZone);
 
         this.reuseOutput = new JoinedRowData();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
index 63fa925..db0b39c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -51,18 +50,11 @@ public final class SliceSharedWindowAggProcessor extends 
AbstractWindowAggProces
     public SliceSharedWindowAggProcessor(
             GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
             WindowBuffer.Factory bufferFactory,
-            WindowCombineFunction.Factory combinerFactory,
             SliceSharedAssigner sliceAssigner,
             TypeSerializer<RowData> accSerializer,
             int indexOfCountStar,
             ZoneId shiftTimeZone) {
-        super(
-                genAggsHandler,
-                bufferFactory,
-                combinerFactory,
-                sliceAssigner,
-                accSerializer,
-                shiftTimeZone);
+        super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, 
shiftTimeZone);
         this.sliceSharedAssigner = sliceAssigner;
         this.mergeTargetHelper = new SliceMergeTargetHelper();
         this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, 
sliceAssigner);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
index 7c5d45a..53c395f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceUnsharedAssigner;
 
 import java.time.ZoneId;
@@ -37,17 +36,10 @@ public final class SliceUnsharedWindowAggProcessor extends 
AbstractWindowAggProc
     public SliceUnsharedWindowAggProcessor(
             GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
             WindowBuffer.Factory windowBufferFactory,
-            WindowCombineFunction.Factory combineFactory,
             SliceUnsharedAssigner sliceAssigner,
             TypeSerializer<RowData> accSerializer,
             ZoneId shiftTimeZone) {
-        super(
-                genAggsHandler,
-                windowBufferFactory,
-                combineFactory,
-                sliceAssigner,
-                accSerializer,
-                shiftTimeZone);
+        super(genAggsHandler, windowBufferFactory, sliceAssigner, 
accSerializer, shiftTimeZone);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
index da89489..658988d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.Records
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
 import 
org.apache.flink.table.runtime.operators.rank.window.combines.TopNRecordsCombiner;
 import 
org.apache.flink.table.runtime.operators.rank.window.processors.WindowRankProcessor;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
 import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
@@ -136,22 +136,17 @@ public class WindowRankOperatorBuilder {
                 windowEndIndex >= 0,
                 String.format(
                         "Illegal window end index %s, it should not be 
negative!", windowEndIndex));
-        final WindowBuffer.Factory bufferFactory =
-                new RecordsWindowBuffer.Factory(keySerializer, 
inputSerializer);
-        final WindowCombineFunction.Factory combinerFactory =
+        final RecordsCombiner.Factory combinerFactory =
                 new TopNRecordsCombiner.Factory(
-                        generatedSortKeyComparator,
-                        sortKeySelector,
-                        keySerializer,
-                        inputSerializer,
-                        rankEnd);
+                        generatedSortKeyComparator, sortKeySelector, 
inputSerializer, rankEnd);
+        final WindowBuffer.Factory bufferFactory =
+                new RecordsWindowBuffer.Factory(keySerializer, 
inputSerializer, combinerFactory);
         final SlicingWindowProcessor<Long> windowProcessor =
                 new WindowRankProcessor(
                         inputSerializer,
                         generatedSortKeyComparator,
                         sortKeySelector.getProducedType().toSerializer(),
                         bufferFactory,
-                        combinerFactory,
                         rankStart,
                         rankEnd,
                         outputRankNumber,
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
index f33757e..7d8929e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import 
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
 import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
 import org.apache.flink.table.runtime.operators.window.state.WindowMapState;
@@ -41,13 +41,12 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static 
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
 
 /**
- * An implementation of {@link WindowCombineFunction} that save topN records 
of incremental input
- * records into the window state.
+ * An implementation of {@link RecordsCombiner} that save topN records of 
incremental input records
+ * into the window state.
  */
-public final class TopNRecordsCombiner implements WindowCombineFunction {
+public final class TopNRecordsCombiner implements RecordsCombiner {
 
     /** The service to register event-time or processing-time timers. */
     private final WindowTimerService<Long> timerService;
@@ -67,12 +66,6 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
     /** TopN size. */
     private final long topN;
 
-    /** Whether to copy input key, because key is reused. */
-    private final boolean requiresCopyKey;
-
-    /** Serializer to copy key if required. */
-    private final TypeSerializer<RowData> keySerializer;
-
     /** Serializer to copy record if required. */
     private final TypeSerializer<RowData> recordSerializer;
 
@@ -86,8 +79,6 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
             Comparator<RowData> sortKeyComparator,
             KeySelector<RowData, RowData> sortKeySelector,
             long topN,
-            boolean requiresCopyKey,
-            TypeSerializer<RowData> keySerializer,
             TypeSerializer<RowData> recordSerializer,
             boolean isEventTime) {
         this.timerService = timerService;
@@ -96,8 +87,6 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
         this.sortKeyComparator = sortKeyComparator;
         this.sortKeySelector = sortKeySelector;
         this.topN = topN;
-        this.requiresCopyKey = requiresCopyKey;
-        this.keySerializer = keySerializer;
         this.recordSerializer = recordSerializer;
         this.isEventTime = isEventTime;
     }
@@ -123,14 +112,7 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
 
         // step 2: flush data in TopNBuffer into state
         Iterator<Map.Entry<RowData, Collection<RowData>>> bufferItr = 
buffer.entrySet().iterator();
-        final RowData key;
-        if (requiresCopyKey) {
-            // the incoming key is reused, we should copy it if state backend 
doesn't copy it
-            key = keySerializer.copy(windowKey.getKey());
-        } else {
-            key = windowKey.getKey();
-        }
-        keyContext.setCurrentKey(key);
+        keyContext.setCurrentKey(windowKey.getKey());
         Long window = windowKey.getWindow();
         while (bufferItr.hasNext()) {
             Map.Entry<RowData, Collection<RowData>> entry = bufferItr.next();
@@ -158,32 +140,29 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
     // 
----------------------------------------------------------------------------------------
 
     /** Factory to create {@link TopNRecordsCombiner}. */
-    public static final class Factory implements WindowCombineFunction.Factory 
{
+    public static final class Factory implements RecordsCombiner.Factory {
 
         private static final long serialVersionUID = 1L;
 
         // The util to compare two sortKey equals to each other.
         private final GeneratedRecordComparator generatedSortKeyComparator;
         private final KeySelector<RowData, RowData> sortKeySelector;
-        private final TypeSerializer<RowData> keySerializer;
         private final TypeSerializer<RowData> recordSerializer;
         private final long topN;
 
         public Factory(
                 GeneratedRecordComparator genSortKeyComparator,
                 RowDataKeySelector sortKeySelector,
-                TypeSerializer<RowData> keySerializer,
                 TypeSerializer<RowData> recordSerializer,
                 long topN) {
             this.generatedSortKeyComparator = genSortKeyComparator;
             this.sortKeySelector = sortKeySelector;
-            this.keySerializer = keySerializer;
             this.recordSerializer = recordSerializer;
             this.topN = topN;
         }
 
         @Override
-        public WindowCombineFunction create(
+        public RecordsCombiner createRecordsCombiner(
                 RuntimeContext runtimeContext,
                 WindowTimerService<Long> timerService,
                 KeyedStateBackend<RowData> stateBackend,
@@ -192,7 +171,6 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
                 throws Exception {
             final Comparator<RowData> sortKeyComparator =
                     
generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader());
-            boolean requiresCopyKey = 
!isStateImmutableInStateBackend(stateBackend);
             WindowMapState<Long, List<RowData>> windowMapState =
                     (WindowMapState<Long, List<RowData>>) windowState;
             return new TopNRecordsCombiner(
@@ -202,8 +180,6 @@ public final class TopNRecordsCombiner implements 
WindowCombineFunction {
                     sortKeyComparator,
                     sortKeySelector,
                     topN,
-                    requiresCopyKey,
-                    keySerializer,
                     recordSerializer,
                     isEventTime);
         }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
index c45b401..4df71ab 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
 import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
-import 
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
 import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
 import 
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
@@ -59,7 +58,6 @@ public final class WindowRankProcessor implements 
SlicingWindowProcessor<Long> {
     private final TypeSerializer<RowData> sortKeySerializer;
 
     private final WindowBuffer.Factory bufferFactory;
-    private final WindowCombineFunction.Factory combineFactory;
     private final TypeSerializer<RowData> inputSerializer;
     private final long rankStart;
     private final long rankEnd;
@@ -88,7 +86,6 @@ public final class WindowRankProcessor implements 
SlicingWindowProcessor<Long> {
             GeneratedRecordComparator genSortKeyComparator,
             TypeSerializer<RowData> sortKeySerializer,
             WindowBuffer.Factory bufferFactory,
-            WindowCombineFunction.Factory combineFactory,
             long rankStart,
             long rankEnd,
             boolean outputRankNumber,
@@ -98,7 +95,6 @@ public final class WindowRankProcessor implements 
SlicingWindowProcessor<Long> {
         this.generatedSortKeyComparator = genSortKeyComparator;
         this.sortKeySerializer = sortKeySerializer;
         this.bufferFactory = bufferFactory;
-        this.combineFactory = combineFactory;
         this.rankStart = rankStart;
         this.rankEnd = rankEnd;
         this.outputRankNumber = outputRankNumber;
@@ -127,19 +123,16 @@ public final class WindowRankProcessor implements 
SlicingWindowProcessor<Long> {
         this.windowState =
                 new WindowMapState<>(
                         (InternalMapState<RowData, Long, RowData, 
List<RowData>>) state);
-        final WindowCombineFunction combineFunction =
-                combineFactory.create(
-                        ctx.getRuntimeContext(),
-                        windowTimerService,
-                        ctx.getKeyedStateBackend(),
-                        windowState,
-                        true);
         this.windowBuffer =
                 bufferFactory.create(
                         ctx.getOperatorOwner(),
                         ctx.getMemoryManager(),
                         ctx.getMemorySize(),
-                        combineFunction,
+                        ctx.getRuntimeContext(),
+                        windowTimerService,
+                        ctx.getKeyedStateBackend(),
+                        windowState,
+                        true,
                         shiftTimeZone);
 
         this.reuseOutput = new JoinedRowData();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
similarity index 76%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
index c12a247..d235ed5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
@@ -30,33 +30,29 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 import java.util.Iterator;
 
-/** The {@link WindowCombineFunction} is used to combine buffered data into 
state. */
+/** The {@link RecordsCombiner} is used to combine buffered records into 
state. */
 @Internal
-public interface WindowCombineFunction {
-
+public interface RecordsCombiner {
     /**
      * Combines the buffered data into state based on the given window-key 
pair.
      *
      * @param windowKey the window-key pair that the buffered data belong to, 
the window-key object
      *     is reused.
-     * @param value the buffered data, the iterator and {@link RowData} 
objects are reused
+     * @param records the buffered data, the iterator and {@link RowData} 
objects are reused.
      */
-    void combine(WindowKey windowKey, Iterator<RowData> value) throws 
Exception;
+    void combine(WindowKey windowKey, Iterator<RowData> records) throws 
Exception;
 
     /** Release resources allocated by this combine function. */
     void close() throws Exception;
 
     // ------------------------------------------------------------------------
 
-    /**
-     * A factory that creates a {@link WindowCombineFunction} for combining at 
global stage, i.e.
-     * keyed operator where combine into keyed state.
-     */
+    /** A factory that creates a {@link RecordsCombiner}. */
     @FunctionalInterface
     interface Factory extends Serializable {
 
         /**
-         * Creates a {@link WindowCombineFunction} that can combine buffered 
data into states.
+         * Creates a {@link RecordsCombiner} that can combine buffered data 
into states.
          *
          * @param runtimeContext the current {@link RuntimeContext}
          * @param timerService the service to register event-time and 
processing-time timers
@@ -65,7 +61,7 @@ public interface WindowCombineFunction {
          * @param isEventTime indicates whether the operator works in 
event-time or processing-time
          *     mode, used for register corresponding timers.
          */
-        WindowCombineFunction create(
+        RecordsCombiner createRecordsCombiner(
                 RuntimeContext runtimeContext,
                 WindowTimerService<Long> timerService,
                 KeyedStateBackend<RowData> stateBackend,
@@ -74,9 +70,10 @@ public interface WindowCombineFunction {
                 throws Exception;
     }
 
-    /** A factory that creates a {@link WindowCombineFunction} used for 
combining at local stage. */
+    /** A factory that creates a {@link RecordsCombiner} used for combining at 
local stage. */
+    @FunctionalInterface
     interface LocalFactory extends Serializable {
-        WindowCombineFunction create(RuntimeContext runtimeContext, 
Collector<RowData> collector)
-                throws Exception;
+        RecordsCombiner createRecordsCombiner(
+                RuntimeContext runtimeContext, Collector<RowData> collector) 
throws Exception;
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
index 7929a26..e3cb25f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
@@ -194,12 +194,12 @@ public abstract class AbstractBytesHashMap<K> extends 
BytesMap<K, BinaryRowData>
 
     /** Returns an iterator for iterating over the entries of this map. */
     @SuppressWarnings("WeakerAccess")
-    public KeyValueIterator<K, BinaryRowData> getEntryIterator() {
+    public KeyValueIterator<K, BinaryRowData> getEntryIterator(boolean 
requiresCopy) {
         if (destructiveIterator != null) {
             throw new IllegalArgumentException(
                     "DestructiveIterator is not null, so this method can't be 
invoke!");
         }
-        return ((RecordArea) recordArea).entryIterator();
+        return ((RecordArea) recordArea).entryIterator(requiresCopy);
     }
 
     /** @return the underlying memory segments of the hash map's record area */
@@ -329,8 +329,8 @@ public abstract class AbstractBytesHashMap<K> extends 
BytesMap<K, BinaryRowData>
 
         // ----------------------- Iterator -----------------------
 
-        private KeyValueIterator<K, BinaryRowData> entryIterator() {
-            return new EntryIterator();
+        private KeyValueIterator<K, BinaryRowData> entryIterator(boolean 
requiresCopy) {
+            return new EntryIterator(requiresCopy);
         }
 
         private final class EntryIterator extends AbstractPagedInputView
@@ -338,10 +338,12 @@ public abstract class AbstractBytesHashMap<K> extends 
BytesMap<K, BinaryRowData>
 
             private int count = 0;
             private int currentSegmentIndex = 0;
+            private final boolean requiresCopy;
 
-            private EntryIterator() {
+            private EntryIterator(boolean requiresCopy) {
                 super(segments.get(0), segmentSize, 0);
                 destructiveIterator = this;
+                this.requiresCopy = requiresCopy;
             }
 
             @Override
@@ -358,12 +360,12 @@ public abstract class AbstractBytesHashMap<K> extends 
BytesMap<K, BinaryRowData>
 
             @Override
             public K getKey() {
-                return reusedKey;
+                return requiresCopy ? keySerializer.copy(reusedKey) : 
reusedKey;
             }
 
             @Override
             public BinaryRowData getValue() {
-                return reusedValue;
+                return requiresCopy ? reusedValue.copy() : reusedValue;
             }
 
             public boolean hasNext() {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
index cc64ef8..7ec8132 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
@@ -183,8 +183,8 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
         }
     }
 
-    public KeyValueIterator<K, Iterator<RowData>> getEntryIterator() {
-        return ((RecordArea) recordArea).entryIterator();
+    public KeyValueIterator<K, Iterator<RowData>> getEntryIterator(boolean 
requiresCopy) {
+        return ((RecordArea) recordArea).entryIterator(requiresCopy);
     }
 
     /** release the map's record and bucket area's memory segments. */
@@ -329,14 +329,17 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
             view.getCurrentSegment().putInt(currPosInSeg, newPointer);
         }
 
-        KeyValueIterator<K, Iterator<RowData>> entryIterator() {
-            return new EntryIterator();
+        KeyValueIterator<K, Iterator<RowData>> entryIterator(boolean 
requiresCopy) {
+            return new EntryIterator(requiresCopy);
         }
 
         final class EntryIterator implements KeyValueIterator<K, 
Iterator<RowData>> {
             private int count;
+            private final boolean requiresCopy;
 
-            public EntryIterator() {
+            public EntryIterator(boolean requiresCopy) {
+                this.requiresCopy = requiresCopy;
+                reusedValueIterator.setRequiresCopy(requiresCopy);
                 count = 0;
                 if (numKeys > 0) {
                     recordArea.setReadPosition(0);
@@ -361,7 +364,7 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
 
             @Override
             public K getKey() {
-                return reusedKey;
+                return requiresCopy ? keySerializer.copy(reusedKey) : 
reusedKey;
             }
 
             @Override
@@ -382,10 +385,12 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
         final class ValueIterator implements Iterator<RowData> {
             private int offset;
             private boolean isFirstRead;
+            private boolean requiresCopy;
 
             public ValueIterator(int offset) {
                 this.offset = offset;
                 this.isFirstRead = true;
+                this.requiresCopy = false;
             }
 
             public void setOffset(int offset) {
@@ -393,6 +398,10 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
                 this.isFirstRead = true;
             }
 
+            public void setRequiresCopy(boolean requiresCopy) {
+                this.requiresCopy = requiresCopy;
+            }
+
             @Override
             public boolean hasNext() {
                 return isFirstRead || offset != -1;
@@ -402,7 +411,7 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
             public RowData next() {
                 if (isFirstRead) {
                     isFirstRead = false;
-                    return reusedRecord;
+                    return requiresCopy ? reusedRecord.copy() : reusedRecord;
                 }
                 if (hasNext()) {
                     valInView.setReadPosition(offset);
@@ -415,7 +424,7 @@ public abstract class AbstractBytesMultiMap<K> extends 
BytesMap<K, Iterator<RowD
                                 "Exception happened while iterating"
                                         + " value list of a key in 
BytesMultiMap");
                     }
-                    return reusedRecord;
+                    return requiresCopy ? reusedRecord.copy() : reusedRecord;
                 }
                 return null;
             }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
index 24e9a2d..88be5ef 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
@@ -158,7 +158,8 @@ public class SumHashAggTestOperator extends 
AbstractStreamOperator<RowData>
 
         if (sorter == null) {
             // no spilling, output by iterating aggregate map.
-            KeyValueIterator<BinaryRowData, BinaryRowData> iter = 
aggregateMap.getEntryIterator();
+            KeyValueIterator<BinaryRowData, BinaryRowData> iter =
+                    aggregateMap.getEntryIterator(false);
 
             while (iter.advanceNext()) {
                 // set result and output
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index 44a3233..a599f29 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -253,7 +253,7 @@ public abstract class BytesHashMapTestBase<K> extends 
BytesMapTestBase {
                 expected.add(entry.copy());
             }
         }
-        KeyValueIterator<K, BinaryRowData> iter = table.getEntryIterator();
+        KeyValueIterator<K, BinaryRowData> iter = 
table.getEntryIterator(false);
         while (iter.advanceNext()) {
             actualKeys.add(keySerializer.copy(iter.getKey()));
             actualValues.add(iter.getValue().copy());
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
index 9a6620c..22dca2d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
@@ -116,7 +116,7 @@ public abstract class BytesMultiMapTestBase<K> extends 
BytesMapTestBase {
             }
         }
 
-        KeyValueIterator<K, Iterator<RowData>> iter = table.getEntryIterator();
+        KeyValueIterator<K, Iterator<RowData>> iter = 
table.getEntryIterator(false);
         while (iter.advanceNext()) {
             int i = 0;
             Iterator<RowData> valueIter = iter.getValue();

Reply via email to