This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 6a0d44d [FLINK-22304][table] Refactor some interfaces for TVF based
window to improve the extendability
6a0d44d is described below
commit 6a0d44d1ffcfa220def7de3ca2130f8cfcf64fe4
Author: shuo.cs <[email protected]>
AuthorDate: Thu Apr 15 14:31:43 2021 +0800
[FLINK-22304][table] Refactor some interfaces for TVF based window to
improve the extendability
This closes #15745
---
.../stream/StreamExecLocalWindowAggregate.java | 20 ++++--
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 2 +-
.../agg/batch/HashWindowCodeGenerator.scala | 2 +-
.../window/LocalSlicingWindowAggOperator.java | 33 ++-------
.../window/SlicingWindowAggOperatorBuilder.java | 37 +++++-----
.../window/buffers/RecordsWindowBuffer.java | 84 +++++++++++++++++++---
.../aggregate/window/buffers/WindowBuffer.java | 46 ++++++++++--
.../{AggRecordsCombiner.java => AggCombiner.java} | 60 +++-------------
...lAggAccCombiner.java => GlobalAggCombiner.java} | 63 ++++++----------
...gRecordsCombiner.java => LocalAggCombiner.java} | 35 ++++-----
.../processors/AbstractWindowAggProcessor.java | 17 ++---
.../processors/SliceSharedWindowAggProcessor.java | 10 +--
.../SliceUnsharedWindowAggProcessor.java | 10 +--
.../rank/window/WindowRankOperatorBuilder.java | 15 ++--
.../rank/window/combines/TopNRecordsCombiner.java | 38 ++--------
.../window/processors/WindowRankProcessor.java | 17 ++---
...owCombineFunction.java => RecordsCombiner.java} | 25 +++----
.../collections/binary/AbstractBytesHashMap.java | 16 +++--
.../collections/binary/AbstractBytesMultiMap.java | 25 ++++---
.../aggregate/SumHashAggTestOperator.java | 3 +-
.../collections/binary/BytesHashMapTestBase.java | 2 +-
.../collections/binary/BytesMultiMapTestBase.java | 2 +-
22 files changed, 260 insertions(+), 302 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 18f8a8d..e58e7f1 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -38,7 +38,11 @@ import
org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import
org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator;
+import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -65,7 +69,6 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
private static final long WINDOW_AGG_MEMORY_RATIO = 100;
public static final String FIELD_NAME_WINDOWING = "windowing";
- public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES =
"namedWindowProperties";
@JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
@@ -139,14 +142,17 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
final RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(grouping,
InternalTypeInfo.of(inputRowType));
+ PagedTypeSerializer<RowData> keySer =
+ (PagedTypeSerializer<RowData>)
selector.getProducedType().toSerializer();
+ AbstractRowDataSerializer<RowData> valueSer = new
RowDataSerializer(inputRowType);
+
+ WindowBuffer.LocalFactory bufferFactory =
+ new RecordsWindowBuffer.LocalFactory(
+ keySer, valueSer, new
LocalAggCombiner.Factory(generatedAggsHandler));
+
final OneInputStreamOperator<RowData, RowData> localAggOperator =
new LocalSlicingWindowAggOperator(
- selector,
- sliceAssigner,
- (PagedTypeSerializer<RowData>)
selector.getProducedType().toSerializer(),
- new RowDataSerializer(inputRowType),
- generatedAggsHandler,
- shiftTimeZone);
+ selector, sliceAssigner, bufferFactory, shiftTimeZone);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 3c01f01..dc76352 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -532,7 +532,7 @@ object HashAggCodeGenHelper {
val rowDataType = classOf[RowData].getCanonicalName
s"""
|$iteratorType<$rowDataType, $rowDataType> $iteratorTerm =
- | $aggregateMapTerm.getEntryIterator();
+ | $aggregateMapTerm.getEntryIterator(false); // reuse key/value during
iterating
|while ($iteratorTerm.advanceNext()) {
| // set result and output
| $reuseGroupKeyTerm = ($rowDataType)$iteratorTerm.getKey();
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
index 8384861..864e71b 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -716,7 +716,7 @@ class HashWindowCodeGenerator(
val iteratorTerm = CodeGenUtils.newName("iterator")
s"""
|$iteratorType<$rowDataType, $rowDataType> $iteratorTerm =
- | $aggregateMapTerm.getEntryIterator();
+ | $aggregateMapTerm.getEntryIterator(false); // reuse key/value during
iterating
|while ($iteratorTerm.advanceNext()) {
| $reuseAggMapKeyTerm = ($rowDataType) $iteratorTerm.getKey();
| $reuseAggBufferTerm = ($rowDataType) $iteratorTerm.getValue();
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 6634f77..b044db7 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -26,16 +26,10 @@ import
org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
-import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggRecordsCombiner;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
-import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import java.time.ZoneId;
import java.util.TimeZone;
@@ -55,8 +49,7 @@ public class LocalSlicingWindowAggOperator extends
AbstractStreamOperator<RowDat
private final RowDataKeySelector keySelector;
private final SliceAssigner sliceAssigner;
private final long windowInterval;
- private final WindowBuffer.Factory windowBufferFactory;
- private final WindowCombineFunction.LocalFactory combinerFactory;
+ private final WindowBuffer.LocalFactory windowBufferFactory;
/**
* The shift timezone of the window, if the proctime or rowtime type is
TIMESTAMP_LTZ, the shift
@@ -88,29 +81,12 @@ public class LocalSlicingWindowAggOperator extends
AbstractStreamOperator<RowDat
public LocalSlicingWindowAggOperator(
RowDataKeySelector keySelector,
SliceAssigner sliceAssigner,
- PagedTypeSerializer<RowData> keySer,
- AbstractRowDataSerializer<RowData> inputSer,
- GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
- ZoneId shiftTimezone) {
- this(
- keySelector,
- sliceAssigner,
- new RecordsWindowBuffer.Factory(keySer, inputSer),
- new LocalAggRecordsCombiner.Factory(genAggsHandler, keySer),
- shiftTimezone);
- }
-
- public LocalSlicingWindowAggOperator(
- RowDataKeySelector keySelector,
- SliceAssigner sliceAssigner,
- WindowBuffer.Factory windowBufferFactory,
- WindowCombineFunction.LocalFactory combinerFactory,
+ WindowBuffer.LocalFactory windowBufferFactory,
ZoneId shiftTimezone) {
this.keySelector = keySelector;
this.sliceAssigner = sliceAssigner;
this.windowInterval = sliceAssigner.getSliceEndInterval();
this.windowBufferFactory = windowBufferFactory;
- this.combinerFactory = combinerFactory;
this.shiftTimezone = shiftTimezone;
this.useDayLightSaving =
TimeZone.getTimeZone(shiftTimezone).useDaylightTime();
}
@@ -123,14 +99,13 @@ public class LocalSlicingWindowAggOperator extends
AbstractStreamOperator<RowDat
collector = new TimestampedCollector<>(output);
collector.eraseTimestamp();
- final WindowCombineFunction localCombiner =
- combinerFactory.create(getRuntimeContext(), collector);
this.windowBuffer =
windowBufferFactory.create(
getContainingTask(),
getContainingTask().getEnvironment().getMemoryManager(),
computeMemorySize(),
- localCombiner,
+ getRuntimeContext(),
+ collector,
shiftTimezone);
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
index 41b0c0c..3aba162 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorBuilder.java
@@ -18,16 +18,15 @@
package org.apache.flink.table.runtime.operators.aggregate.window;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import
org.apache.flink.table.runtime.operators.aggregate.window.combines.AggRecordsCombiner;
-import
org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggAccCombiner;
+import
org.apache.flink.table.runtime.operators.aggregate.window.combines.AggCombiner;
+import
org.apache.flink.table.runtime.operators.aggregate.window.combines.GlobalAggCombiner;
import
org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
import
org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceUnsharedWindowAggProcessor;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import
org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners.HoppingSliceAssigner;
import
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -64,7 +63,7 @@ public class SlicingWindowAggOperatorBuilder {
private SliceAssigner assigner;
private AbstractRowDataSerializer<RowData> inputSerializer;
private PagedTypeSerializer<RowData> keySerializer;
- private TypeSerializer<RowData> accSerializer;
+ private AbstractRowDataSerializer<RowData> accSerializer;
private GeneratedNamespaceAggsHandleFunction<Long>
generatedAggregateFunction;
private GeneratedNamespaceAggsHandleFunction<Long>
localGeneratedAggregateFunction;
private GeneratedNamespaceAggsHandleFunction<Long>
globalGeneratedAggregateFunction;
@@ -95,7 +94,7 @@ public class SlicingWindowAggOperatorBuilder {
public SlicingWindowAggOperatorBuilder aggregate(
GeneratedNamespaceAggsHandleFunction<Long>
generatedAggregateFunction,
- TypeSerializer<RowData> accSerializer) {
+ AbstractRowDataSerializer<RowData> accSerializer) {
this.generatedAggregateFunction = generatedAggregateFunction;
this.accSerializer = accSerializer;
return this;
@@ -105,7 +104,7 @@ public class SlicingWindowAggOperatorBuilder {
GeneratedNamespaceAggsHandleFunction<Long>
localGeneratedAggregateFunction,
GeneratedNamespaceAggsHandleFunction<Long>
globalGeneratedAggregateFunction,
GeneratedNamespaceAggsHandleFunction<Long>
stateGeneratedAggregateFunction,
- TypeSerializer<RowData> accSerializer) {
+ AbstractRowDataSerializer<RowData> accSerializer) {
this.localGeneratedAggregateFunction = localGeneratedAggregateFunction;
this.globalGeneratedAggregateFunction =
globalGeneratedAggregateFunction;
this.generatedAggregateFunction = stateGeneratedAggregateFunction;
@@ -131,20 +130,20 @@ public class SlicingWindowAggOperatorBuilder {
checkNotNull(keySerializer);
checkNotNull(accSerializer);
checkNotNull(generatedAggregateFunction);
- final WindowBuffer.Factory bufferFactory =
- new RecordsWindowBuffer.Factory(keySerializer,
inputSerializer);
- final WindowCombineFunction.Factory combinerFactory;
- if (localGeneratedAggregateFunction != null &&
globalGeneratedAggregateFunction != null) {
+
+ boolean isGlobalAgg =
+ localGeneratedAggregateFunction != null &&
globalGeneratedAggregateFunction != null;
+
+ RecordsCombiner.Factory combinerFactory;
+ if (isGlobalAgg) {
combinerFactory =
- new GlobalAggAccCombiner.Factory(
- localGeneratedAggregateFunction,
- globalGeneratedAggregateFunction,
- keySerializer);
+ new GlobalAggCombiner.Factory(
+ localGeneratedAggregateFunction,
globalGeneratedAggregateFunction);
} else {
- combinerFactory =
- new AggRecordsCombiner.Factory(
- generatedAggregateFunction, keySerializer,
inputSerializer);
+ combinerFactory = new
AggCombiner.Factory(generatedAggregateFunction);
}
+ final WindowBuffer.Factory bufferFactory =
+ new RecordsWindowBuffer.Factory(keySerializer,
inputSerializer, combinerFactory);
final SlicingWindowProcessor<Long> windowProcessor;
if (assigner instanceof SliceSharedAssigner) {
@@ -152,7 +151,6 @@ public class SlicingWindowAggOperatorBuilder {
new SliceSharedWindowAggProcessor(
generatedAggregateFunction,
bufferFactory,
- combinerFactory,
(SliceSharedAssigner) assigner,
accSerializer,
indexOfCountStart,
@@ -162,7 +160,6 @@ public class SlicingWindowAggOperatorBuilder {
new SliceUnsharedWindowAggProcessor(
generatedAggregateFunction,
bufferFactory,
- combinerFactory,
(SliceUnsharedAssigner) assigner,
accSerializer,
shiftTimeZone);
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
index 7f6e53d..9ca57f4 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
@@ -18,9 +18,13 @@
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
@@ -28,11 +32,13 @@ import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.WindowKey;
import
org.apache.flink.table.runtime.util.collections.binary.BytesMap.LookupInfo;
import
org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
+import org.apache.flink.util.Collector;
import java.io.EOFException;
import java.time.ZoneId;
import java.util.Iterator;
+import static
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/**
@@ -41,11 +47,14 @@ import static
org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
*/
public final class RecordsWindowBuffer implements WindowBuffer {
- private final WindowCombineFunction combineFunction;
+ private final RecordsCombiner combineFunction;
private final WindowBytesMultiMap recordsBuffer;
private final WindowKey reuseWindowKey;
private final AbstractRowDataSerializer<RowData> recordSerializer;
private final ZoneId shiftTimeZone;
+ // copy key and input record if necessary(e.g., heap state backend),
+ // because key and record are reused.
+ private final boolean requiresCopy;
private long minSliceEnd = Long.MAX_VALUE;
@@ -53,9 +62,10 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
- WindowCombineFunction combineFunction,
+ RecordsCombiner combineFunction,
PagedTypeSerializer<RowData> keySer,
AbstractRowDataSerializer<RowData> inputSer,
+ boolean requiresCopy,
ZoneId shiftTimeZone) {
this.combineFunction = combineFunction;
this.recordsBuffer =
@@ -63,6 +73,7 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
operatorOwner, memoryManager, memorySize, keySer,
inputSer.getArity());
this.recordSerializer = inputSer;
this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
+ this.requiresCopy = requiresCopy;
this.shiftTimeZone = shiftTimeZone;
}
@@ -97,7 +108,7 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
public void flush() throws Exception {
if (recordsBuffer.getNumKeys() > 0) {
KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator =
- recordsBuffer.getEntryIterator();
+ recordsBuffer.getEntryIterator(requiresCopy);
while (entryIterator.advanceNext()) {
combineFunction.combine(entryIterator.getKey(),
entryIterator.getValue());
}
@@ -117,18 +128,22 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
// Factory
//
------------------------------------------------------------------------------------------
- /** Factory to create {@link RecordsWindowBuffer}. */
+ /** Factory to create {@link RecordsWindowBuffer} with {@link
RecordsCombiner.Factory}. */
public static final class Factory implements WindowBuffer.Factory {
private static final long serialVersionUID = 1L;
private final PagedTypeSerializer<RowData> keySer;
private final AbstractRowDataSerializer<RowData> inputSer;
+ private final RecordsCombiner.Factory factory;
public Factory(
- PagedTypeSerializer<RowData> keySer,
AbstractRowDataSerializer<RowData> inputSer) {
+ PagedTypeSerializer<RowData> keySer,
+ AbstractRowDataSerializer<RowData> inputSer,
+ RecordsCombiner.Factory combinerFactory) {
this.keySer = keySer;
this.inputSer = inputSer;
+ this.factory = combinerFactory;
}
@Override
@@ -136,15 +151,66 @@ public final class RecordsWindowBuffer implements
WindowBuffer {
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
- WindowCombineFunction combineFunction,
- ZoneId shiftTimeZone) {
+ RuntimeContext runtimeContext,
+ WindowTimerService<Long> timerService,
+ KeyedStateBackend<RowData> stateBackend,
+ WindowState<Long> windowState,
+ boolean isEventTime,
+ ZoneId shiftTimeZone)
+ throws Exception {
+ RecordsCombiner combiner =
+ factory.createRecordsCombiner(
+ runtimeContext, timerService, stateBackend,
windowState, isEventTime);
+ boolean requiresCopy =
!isStateImmutableInStateBackend(stateBackend);
return new RecordsWindowBuffer(
operatorOwner,
memoryManager,
memorySize,
- combineFunction,
+ combiner,
keySer,
inputSer,
+ requiresCopy,
+ shiftTimeZone);
+ }
+ }
+
+ /** Factory to create {@link RecordsWindowBuffer} with {@link
RecordsCombiner.LocalFactory}. */
+ public static final class LocalFactory implements
WindowBuffer.LocalFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PagedTypeSerializer<RowData> keySer;
+ private final AbstractRowDataSerializer<RowData> inputSer;
+ private final RecordsCombiner.LocalFactory localFactory;
+
+ public LocalFactory(
+ PagedTypeSerializer<RowData> keySer,
+ AbstractRowDataSerializer<RowData> inputSer,
+ RecordsCombiner.LocalFactory localFactory) {
+ this.keySer = keySer;
+ this.inputSer = inputSer;
+ this.localFactory = localFactory;
+ }
+
+ @Override
+ public WindowBuffer create(
+ Object operatorOwner,
+ MemoryManager memoryManager,
+ long memorySize,
+ RuntimeContext runtimeContext,
+ Collector<RowData> collector,
+ ZoneId shiftTimeZone)
+ throws Exception {
+ RecordsCombiner combiner =
+ localFactory.createRecordsCombiner(runtimeContext,
collector);
+ return new RecordsWindowBuffer(
+ operatorOwner,
+ memoryManager,
+ memorySize,
+ combiner,
+ keySer,
+ inputSer,
+ false,
shiftTimeZone);
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
index 7b954f8..4f86659 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java
@@ -19,9 +19,13 @@
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.util.Collector;
import java.io.IOException;
import java.io.Serializable;
@@ -83,7 +87,12 @@ public interface WindowBuffer {
* @param operatorOwner the owner of the operator
* @param memoryManager the manager that governs memory by Flink
framework
* @param memorySize the managed memory size can be used by this
operator
- * @param combineFunction the combine function used to combine
buffered data into state
+ * @param runtimeContext the current {@link RuntimeContext}
+ * @param timerService the service to register event-time and
processing-time timers
+ * @param stateBackend the state backend to accessing states
+ * @param windowState the window state to flush buffered data into.
+ * @param isEventTime indicates whether the operator works in
event-time or processing-time
+ * mode, used for register corresponding timers.
* @param shiftTimeZone the shit timezone of the window
* @throws IOException thrown if the buffer can't be opened
*/
@@ -91,8 +100,37 @@ public interface WindowBuffer {
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
- WindowCombineFunction combineFunction,
+ RuntimeContext runtimeContext,
+ WindowTimerService<Long> timerService,
+ KeyedStateBackend<RowData> stateBackend,
+ WindowState<Long> windowState,
+ boolean isEventTime,
ZoneId shiftTimeZone)
- throws IOException;
+ throws Exception;
+ }
+
+ /** A factory that creates a {@link WindowBuffer}. */
+ @FunctionalInterface
+ interface LocalFactory extends Serializable {
+
+ /**
+ * Creates a {@link WindowBuffer} for local window that buffers
elements in memory before
+ * flushing.
+ *
+ * @param operatorOwner the owner of the operator
+ * @param memoryManager the manager that governs memory by Flink
framework
+ * @param memorySize the managed memory size can be used by this
operator
+ * @param collector collector to emit records
+ * @param shiftTimeZone the shit timezone of the window
+ * @throws IOException thrown if the buffer can't be opened
+ */
+ WindowBuffer create(
+ Object operatorOwner,
+ MemoryManager memoryManager,
+ long memorySize,
+ RuntimeContext runtimeContext,
+ Collector<RowData> collector,
+ ZoneId shiftTimeZone)
+ throws Exception;
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
similarity index 71%
rename from
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
rename to
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
index 53b9372..1ec2bc1 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggRecordsCombiner.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/AggCombiner.java
@@ -19,14 +19,13 @@
package org.apache.flink.table.runtime.operators.aggregate.window.combines;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
@@ -37,14 +36,13 @@ import java.time.ZoneId;
import java.util.Iterator;
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/**
- * An implementation of {@link WindowCombineFunction} that accumulates input
records into the window
+ * An implementation of {@link RecordsCombiner} that accumulates input records
into the window
* accumulator state.
*/
-public final class AggRecordsCombiner implements WindowCombineFunction {
+public class AggCombiner implements RecordsCombiner {
/** The service to register event-time or processing-time timers. */
private final WindowTimerService<Long> timerService;
@@ -58,48 +56,26 @@ public final class AggRecordsCombiner implements
WindowCombineFunction {
/** Function used to handle all aggregates. */
private final NamespaceAggsHandleFunction<Long> aggregator;
- /** Whether to copy key and input record, because key and record are
reused. */
- private final boolean requiresCopy;
-
- /** Serializer to copy key if required. */
- private final TypeSerializer<RowData> keySerializer;
-
- /** Serializer to copy record if required. */
- private final TypeSerializer<RowData> recordSerializer;
-
/** Whether the operator works in event-time mode, used to indicate
registering which timer. */
private final boolean isEventTime;
- public AggRecordsCombiner(
+ public AggCombiner(
WindowTimerService<Long> timerService,
StateKeyContext keyContext,
WindowValueState<Long> accState,
NamespaceAggsHandleFunction<Long> aggregator,
- boolean requiresCopy,
- TypeSerializer<RowData> keySerializer,
- TypeSerializer<RowData> recordSerializer,
boolean isEventTime) {
this.timerService = timerService;
this.keyContext = keyContext;
this.accState = accState;
this.aggregator = aggregator;
- this.requiresCopy = requiresCopy;
- this.keySerializer = keySerializer;
- this.recordSerializer = recordSerializer;
this.isEventTime = isEventTime;
}
@Override
public void combine(WindowKey windowKey, Iterator<RowData> records) throws
Exception {
// step 0: set current key for states and timers
- final RowData key;
- if (requiresCopy) {
- // the incoming key is reused, we should copy it if state backend
doesn't copy it
- key = keySerializer.copy(windowKey.getKey());
- } else {
- key = windowKey.getKey();
- }
- keyContext.setCurrentKey(key);
+ keyContext.setCurrentKey(windowKey.getKey());
// step 1: get the accumulator for the current key and window
Long window = windowKey.getWindow();
@@ -114,10 +90,6 @@ public final class AggRecordsCombiner implements
WindowCombineFunction {
// step 3: do accumulate
while (records.hasNext()) {
RowData record = records.next();
- if (requiresCopy) {
- // the incoming record is reused, we should copy it if state
backend doesn't copy it
- record = recordSerializer.copy(record);
- }
if (isAccumulateMsg(record)) {
aggregator.accumulate(record);
} else {
@@ -151,26 +123,18 @@ public final class AggRecordsCombiner implements
WindowCombineFunction {
// Factory
//
----------------------------------------------------------------------------------------
- /** Factory to create {@link AggRecordsCombiner}. */
- public static final class Factory implements WindowCombineFunction.Factory
{
-
+ /** Factory to create {@link AggCombiner}. */
+ public static final class Factory implements RecordsCombiner.Factory {
private static final long serialVersionUID = 1L;
private final GeneratedNamespaceAggsHandleFunction<Long>
genAggsHandler;
- private final TypeSerializer<RowData> keySerializer;
- private final TypeSerializer<RowData> recordSerializer;
- public Factory(
- GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
- TypeSerializer<RowData> keySerializer,
- TypeSerializer<RowData> recordSerializer) {
+ public Factory(GeneratedNamespaceAggsHandleFunction<Long>
genAggsHandler) {
this.genAggsHandler = genAggsHandler;
- this.keySerializer = keySerializer;
- this.recordSerializer = recordSerializer;
}
@Override
- public WindowCombineFunction create(
+ public RecordsCombiner createRecordsCombiner(
RuntimeContext runtimeContext,
WindowTimerService<Long> timerService,
KeyedStateBackend<RowData> stateBackend,
@@ -182,16 +146,12 @@ public final class AggRecordsCombiner implements
WindowCombineFunction {
aggregator.open(
new PerWindowStateDataViewStore(
stateBackend, LongSerializer.INSTANCE,
runtimeContext));
- boolean requiresCopy =
!isStateImmutableInStateBackend(stateBackend);
WindowValueState<Long> windowValueState = (WindowValueState<Long>)
windowState;
- return new AggRecordsCombiner(
+ return new AggCombiner(
timerService,
stateBackend::setCurrentKey,
windowValueState,
aggregator,
- requiresCopy,
- keySerializer,
- recordSerializer,
isEventTime);
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
similarity index 75%
rename from
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
rename to
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
index e8fc66a..bf6e300 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggAccCombiner.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/GlobalAggCombiner.java
@@ -19,14 +19,13 @@
package org.apache.flink.table.runtime.operators.aggregate.window.combines;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
@@ -36,16 +35,15 @@ import org.apache.flink.table.runtime.util.WindowKey;
import java.time.ZoneId;
import java.util.Iterator;
-import static
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/**
- * An implementation of {@link WindowCombineFunction} that accumulates local
accumulators records
- * into the window accumulator state.
+ * An implementation of {@link RecordsCombiner} that accumulates local
accumulators records into the
+ * window accumulator state.
*
* <p>Note: this only supports event-time window.
*/
-public final class GlobalAggAccCombiner implements WindowCombineFunction {
+public class GlobalAggCombiner implements RecordsCombiner {
/** The service to register event-time or processing-time timers. */
private final WindowTimerService<Long> timerService;
@@ -62,50 +60,35 @@ public final class GlobalAggAccCombiner implements
WindowCombineFunction {
/** Global aggregate function to handle global accumulator rows. */
private final NamespaceAggsHandleFunction<Long> globalAggregator;
- /** Whether to copy key and input record, because key and record are
reused. */
- private final boolean requiresCopy;
-
- /** Serializer to copy key if required. */
- private final TypeSerializer<RowData> keySerializer;
-
- public GlobalAggAccCombiner(
+ public GlobalAggCombiner(
WindowTimerService<Long> timerService,
StateKeyContext keyContext,
WindowValueState<Long> accState,
NamespaceAggsHandleFunction<Long> localAggregator,
- NamespaceAggsHandleFunction<Long> globalAggregator,
- boolean requiresCopy,
- TypeSerializer<RowData> keySerializer) {
+ NamespaceAggsHandleFunction<Long> globalAggregator) {
this.timerService = timerService;
this.keyContext = keyContext;
this.accState = accState;
this.localAggregator = localAggregator;
this.globalAggregator = globalAggregator;
- this.requiresCopy = requiresCopy;
- this.keySerializer = keySerializer;
}
@Override
public void combine(WindowKey windowKey, Iterator<RowData> localAccs)
throws Exception {
- // step 0: set current key for states and timers
- final RowData key;
- if (requiresCopy) {
- // the incoming key is reused, we should copy it if state backend
doesn't copy it
- key = keySerializer.copy(windowKey.getKey());
- } else {
- key = windowKey.getKey();
- }
- keyContext.setCurrentKey(key);
Long window = windowKey.getWindow();
-
- // step 1: merge localAccs into one acc
RowData acc = localAggregator.createAccumulators();
localAggregator.setAccumulators(window, acc);
while (localAccs.hasNext()) {
RowData localAcc = localAccs.next();
localAggregator.merge(window, localAcc);
}
- RowData mergedLocalAcc = localAggregator.getAccumulators();
+ combineAccumulator(windowKey, localAggregator.getAccumulators());
+ }
+
+ private void combineAccumulator(WindowKey windowKey, RowData acc) throws
Exception {
+ // step 1: set current key for states and timers
+ keyContext.setCurrentKey(windowKey.getKey());
+ Long window = windowKey.getWindow();
// step2: merge acc into state
RowData stateAcc = accState.value(window);
@@ -113,7 +96,7 @@ public final class GlobalAggAccCombiner implements
WindowCombineFunction {
stateAcc = globalAggregator.createAccumulators();
}
globalAggregator.setAccumulators(window, stateAcc);
- globalAggregator.merge(window, mergedLocalAcc);
+ globalAggregator.merge(window, acc);
stateAcc = globalAggregator.getAccumulators();
accState.update(window, stateAcc);
@@ -136,26 +119,23 @@ public final class GlobalAggAccCombiner implements
WindowCombineFunction {
// Factory
//
----------------------------------------------------------------------------------------
- /** Factory to create {@link GlobalAggAccCombiner}. */
- public static final class Factory implements WindowCombineFunction.Factory
{
+ /** Factory to create {@link GlobalAggCombiner}. */
+ public static final class Factory implements RecordsCombiner.Factory {
private static final long serialVersionUID = 1L;
private final GeneratedNamespaceAggsHandleFunction<Long>
genLocalAggsHandler;
private final GeneratedNamespaceAggsHandleFunction<Long>
genGlobalAggsHandler;
- private final TypeSerializer<RowData> keySerializer;
public Factory(
GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler,
- GeneratedNamespaceAggsHandleFunction<Long>
genGlobalAggsHandler,
- TypeSerializer<RowData> keySerializer) {
+ GeneratedNamespaceAggsHandleFunction<Long>
genGlobalAggsHandler) {
this.genLocalAggsHandler = genLocalAggsHandler;
this.genGlobalAggsHandler = genGlobalAggsHandler;
- this.keySerializer = keySerializer;
}
@Override
- public WindowCombineFunction create(
+ public RecordsCombiner createRecordsCombiner(
RuntimeContext runtimeContext,
WindowTimerService<Long> timerService,
KeyedStateBackend<RowData> stateBackend,
@@ -172,16 +152,13 @@ public final class GlobalAggAccCombiner implements
WindowCombineFunction {
globalAggregator.open(
new PerWindowStateDataViewStore(
stateBackend, LongSerializer.INSTANCE,
runtimeContext));
- boolean requiresCopy =
!isStateImmutableInStateBackend(stateBackend);
WindowValueState<Long> windowValueState = (WindowValueState<Long>)
windowState;
- return new GlobalAggAccCombiner(
+ return new GlobalAggCombiner(
timerService,
stateBackend::setCurrentKey,
windowValueState,
localAggregator,
- globalAggregator,
- requiresCopy,
- keySerializer);
+ globalAggregator);
}
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
similarity index 77%
rename from
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
rename to
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
index 71e92d8..c2c8807 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggRecordsCombiner.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/combines/LocalAggCombiner.java
@@ -19,14 +19,13 @@
package org.apache.flink.table.runtime.operators.aggregate.window.combines;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.UnsupportedStateDataViewStore;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.util.Collector;
@@ -35,19 +34,16 @@ import java.util.Iterator;
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
/**
- * An implementation of {@link WindowCombineFunction} that accumulates input
records into local
+ * An implementation of {@link RecordsCombiner} that accumulates input records
into local
* accumulators.
*
* <p>Note: this only supports event-time window.
*/
-public final class LocalAggRecordsCombiner implements WindowCombineFunction {
+public class LocalAggCombiner implements RecordsCombiner {
/** Function used to handle all aggregates. */
private final NamespaceAggsHandleFunction<Long> aggregator;
- /** Serializer to copy key if required. */
- private final TypeSerializer<RowData> keySerializer;
-
/** The output to emit combined accumulator result. */
private final Collector<RowData> collector;
@@ -63,19 +59,16 @@ public final class LocalAggRecordsCombiner implements
WindowCombineFunction {
*/
private final GenericRowData windowRow = new GenericRowData(1);
- public LocalAggRecordsCombiner(
- NamespaceAggsHandleFunction<Long> aggregator,
- TypeSerializer<RowData> keySerializer,
- Collector<RowData> collector) {
+ public LocalAggCombiner(
+ NamespaceAggsHandleFunction<Long> aggregator, Collector<RowData>
collector) {
this.aggregator = aggregator;
- this.keySerializer = keySerializer;
this.collector = collector;
}
@Override
public void combine(WindowKey windowKey, Iterator<RowData> records) throws
Exception {
- // always copy key because we will merge record into memory acc
- final RowData key = keySerializer.copy(windowKey.getKey());
+ // always not copy key/value because they are not cached.
+ final RowData key = windowKey.getKey();
final Long window = windowKey.getWindow();
// step 1: create an empty accumulator
@@ -116,28 +109,24 @@ public final class LocalAggRecordsCombiner implements
WindowCombineFunction {
// Factory
//
----------------------------------------------------------------------------------------
- /** Factory to create {@link LocalAggRecordsCombiner}. */
- public static final class Factory implements
WindowCombineFunction.LocalFactory {
+ /** Factory to create {@link LocalAggCombiner}. */
+ public static final class Factory implements RecordsCombiner.LocalFactory {
private static final long serialVersionUID = 1L;
private final GeneratedNamespaceAggsHandleFunction<Long>
genAggsHandler;
- private final TypeSerializer<RowData> keySerializer;
- public Factory(
- GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
- TypeSerializer<RowData> keySerializer) {
+ public Factory(GeneratedNamespaceAggsHandleFunction<Long>
genAggsHandler) {
this.genAggsHandler = genAggsHandler;
- this.keySerializer = keySerializer;
}
@Override
- public WindowCombineFunction create(
+ public RecordsCombiner createRecordsCombiner(
RuntimeContext runtimeContext, Collector<RowData> collector)
throws Exception {
final NamespaceAggsHandleFunction<Long> aggregator =
genAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
aggregator.open(new UnsupportedStateDataViewStore(runtimeContext));
- return new LocalAggRecordsCombiner(aggregator, keySerializer,
collector);
+ return new LocalAggCombiner(aggregator, collector);
}
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
index 85f0ebb..0851949 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
@@ -29,7 +29,6 @@ import
org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -50,7 +49,6 @@ public abstract class AbstractWindowAggProcessor implements
SlicingWindowProcess
protected final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;
protected final WindowBuffer.Factory windowBufferFactory;
- protected final WindowCombineFunction.Factory combineFactory;
protected final SliceAssigner sliceAssigner;
protected final TypeSerializer<RowData> accSerializer;
protected final boolean isEventTime;
@@ -85,13 +83,11 @@ public abstract class AbstractWindowAggProcessor implements
SlicingWindowProcess
public AbstractWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory bufferFactory,
- WindowCombineFunction.Factory combinerFactory,
SliceAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
ZoneId shiftTimeZone) {
this.genAggsHandler = genAggsHandler;
this.windowBufferFactory = bufferFactory;
- this.combineFactory = combinerFactory;
this.sliceAssigner = sliceAssigner;
this.accSerializer = accSerializer;
this.isEventTime = sliceAssigner.isEventTime();
@@ -118,19 +114,16 @@ public abstract class AbstractWindowAggProcessor
implements SlicingWindowProcess
this.aggregator.open(
new PerWindowStateDataViewStore(
ctx.getKeyedStateBackend(), namespaceSerializer,
ctx.getRuntimeContext()));
- final WindowCombineFunction combineFunction =
- combineFactory.create(
- ctx.getRuntimeContext(),
- windowTimerService,
- ctx.getKeyedStateBackend(),
- windowState,
- isEventTime);
this.windowBuffer =
windowBufferFactory.create(
ctx.getOperatorOwner(),
ctx.getMemoryManager(),
ctx.getMemorySize(),
- combineFunction,
+ ctx.getRuntimeContext(),
+ windowTimerService,
+ ctx.getKeyedStateBackend(),
+ windowState,
+ isEventTime,
shiftTimeZone);
this.reuseOutput = new JoinedRowData();
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
index 63fa925..db0b39c 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
@@ -51,18 +50,11 @@ public final class SliceSharedWindowAggProcessor extends
AbstractWindowAggProces
public SliceSharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory bufferFactory,
- WindowCombineFunction.Factory combinerFactory,
SliceSharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
int indexOfCountStar,
ZoneId shiftTimeZone) {
- super(
- genAggsHandler,
- bufferFactory,
- combinerFactory,
- sliceAssigner,
- accSerializer,
- shiftTimeZone);
+ super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer,
shiftTimeZone);
this.sliceSharedAssigner = sliceAssigner;
this.mergeTargetHelper = new SliceMergeTargetHelper();
this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar,
sliceAssigner);
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
index 7c5d45a..53c395f 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceUnsharedWindowAggProcessor.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import
org.apache.flink.table.runtime.operators.window.slicing.SliceUnsharedAssigner;
import java.time.ZoneId;
@@ -37,17 +36,10 @@ public final class SliceUnsharedWindowAggProcessor extends
AbstractWindowAggProc
public SliceUnsharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory windowBufferFactory,
- WindowCombineFunction.Factory combineFactory,
SliceUnsharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
ZoneId shiftTimeZone) {
- super(
- genAggsHandler,
- windowBufferFactory,
- combineFactory,
- sliceAssigner,
- accSerializer,
- shiftTimeZone);
+ super(genAggsHandler, windowBufferFactory, sliceAssigner,
accSerializer, shiftTimeZone);
}
@Override
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
index da89489..658988d 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
@@ -25,7 +25,7 @@ import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.Records
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import
org.apache.flink.table.runtime.operators.rank.window.combines.TopNRecordsCombiner;
import
org.apache.flink.table.runtime.operators.rank.window.processors.WindowRankProcessor;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
import
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
@@ -136,22 +136,17 @@ public class WindowRankOperatorBuilder {
windowEndIndex >= 0,
String.format(
"Illegal window end index %s, it should not be
negative!", windowEndIndex));
- final WindowBuffer.Factory bufferFactory =
- new RecordsWindowBuffer.Factory(keySerializer,
inputSerializer);
- final WindowCombineFunction.Factory combinerFactory =
+ final RecordsCombiner.Factory combinerFactory =
new TopNRecordsCombiner.Factory(
- generatedSortKeyComparator,
- sortKeySelector,
- keySerializer,
- inputSerializer,
- rankEnd);
+ generatedSortKeyComparator, sortKeySelector,
inputSerializer, rankEnd);
+ final WindowBuffer.Factory bufferFactory =
+ new RecordsWindowBuffer.Factory(keySerializer,
inputSerializer, combinerFactory);
final SlicingWindowProcessor<Long> windowProcessor =
new WindowRankProcessor(
inputSerializer,
generatedSortKeyComparator,
sortKeySelector.getProducedType().toSerializer(),
bufferFactory,
- combinerFactory,
rankStart,
rankEnd,
outputRankNumber,
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
index f33757e..7d8929e 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
+import
org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.state.WindowMapState;
@@ -41,13 +41,12 @@ import java.util.List;
import java.util.Map;
import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static
org.apache.flink.table.runtime.util.StateConfigUtil.isStateImmutableInStateBackend;
/**
- * An implementation of {@link WindowCombineFunction} that save topN records
of incremental input
- * records into the window state.
+ * An implementation of {@link RecordsCombiner} that save topN records of
incremental input records
+ * into the window state.
*/
-public final class TopNRecordsCombiner implements WindowCombineFunction {
+public final class TopNRecordsCombiner implements RecordsCombiner {
/** The service to register event-time or processing-time timers. */
private final WindowTimerService<Long> timerService;
@@ -67,12 +66,6 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
/** TopN size. */
private final long topN;
- /** Whether to copy input key, because key is reused. */
- private final boolean requiresCopyKey;
-
- /** Serializer to copy key if required. */
- private final TypeSerializer<RowData> keySerializer;
-
/** Serializer to copy record if required. */
private final TypeSerializer<RowData> recordSerializer;
@@ -86,8 +79,6 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
Comparator<RowData> sortKeyComparator,
KeySelector<RowData, RowData> sortKeySelector,
long topN,
- boolean requiresCopyKey,
- TypeSerializer<RowData> keySerializer,
TypeSerializer<RowData> recordSerializer,
boolean isEventTime) {
this.timerService = timerService;
@@ -96,8 +87,6 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
this.sortKeyComparator = sortKeyComparator;
this.sortKeySelector = sortKeySelector;
this.topN = topN;
- this.requiresCopyKey = requiresCopyKey;
- this.keySerializer = keySerializer;
this.recordSerializer = recordSerializer;
this.isEventTime = isEventTime;
}
@@ -123,14 +112,7 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
// step 2: flush data in TopNBuffer into state
Iterator<Map.Entry<RowData, Collection<RowData>>> bufferItr =
buffer.entrySet().iterator();
- final RowData key;
- if (requiresCopyKey) {
- // the incoming key is reused, we should copy it if state backend
doesn't copy it
- key = keySerializer.copy(windowKey.getKey());
- } else {
- key = windowKey.getKey();
- }
- keyContext.setCurrentKey(key);
+ keyContext.setCurrentKey(windowKey.getKey());
Long window = windowKey.getWindow();
while (bufferItr.hasNext()) {
Map.Entry<RowData, Collection<RowData>> entry = bufferItr.next();
@@ -158,32 +140,29 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
//
----------------------------------------------------------------------------------------
/** Factory to create {@link TopNRecordsCombiner}. */
- public static final class Factory implements WindowCombineFunction.Factory
{
+ public static final class Factory implements RecordsCombiner.Factory {
private static final long serialVersionUID = 1L;
// The util to compare two sortKey equals to each other.
private final GeneratedRecordComparator generatedSortKeyComparator;
private final KeySelector<RowData, RowData> sortKeySelector;
- private final TypeSerializer<RowData> keySerializer;
private final TypeSerializer<RowData> recordSerializer;
private final long topN;
public Factory(
GeneratedRecordComparator genSortKeyComparator,
RowDataKeySelector sortKeySelector,
- TypeSerializer<RowData> keySerializer,
TypeSerializer<RowData> recordSerializer,
long topN) {
this.generatedSortKeyComparator = genSortKeyComparator;
this.sortKeySelector = sortKeySelector;
- this.keySerializer = keySerializer;
this.recordSerializer = recordSerializer;
this.topN = topN;
}
@Override
- public WindowCombineFunction create(
+ public RecordsCombiner createRecordsCombiner(
RuntimeContext runtimeContext,
WindowTimerService<Long> timerService,
KeyedStateBackend<RowData> stateBackend,
@@ -192,7 +171,6 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
throws Exception {
final Comparator<RowData> sortKeyComparator =
generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader());
- boolean requiresCopyKey =
!isStateImmutableInStateBackend(stateBackend);
WindowMapState<Long, List<RowData>> windowMapState =
(WindowMapState<Long, List<RowData>>) windowState;
return new TopNRecordsCombiner(
@@ -202,8 +180,6 @@ public final class TopNRecordsCombiner implements
WindowCombineFunction {
sortKeyComparator,
sortKeySelector,
topN,
- requiresCopyKey,
- keySerializer,
recordSerializer,
isEventTime);
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
index c45b401..4df71ab 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
-import
org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import
org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import
org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
@@ -59,7 +58,6 @@ public final class WindowRankProcessor implements
SlicingWindowProcessor<Long> {
private final TypeSerializer<RowData> sortKeySerializer;
private final WindowBuffer.Factory bufferFactory;
- private final WindowCombineFunction.Factory combineFactory;
private final TypeSerializer<RowData> inputSerializer;
private final long rankStart;
private final long rankEnd;
@@ -88,7 +86,6 @@ public final class WindowRankProcessor implements
SlicingWindowProcessor<Long> {
GeneratedRecordComparator genSortKeyComparator,
TypeSerializer<RowData> sortKeySerializer,
WindowBuffer.Factory bufferFactory,
- WindowCombineFunction.Factory combineFactory,
long rankStart,
long rankEnd,
boolean outputRankNumber,
@@ -98,7 +95,6 @@ public final class WindowRankProcessor implements
SlicingWindowProcessor<Long> {
this.generatedSortKeyComparator = genSortKeyComparator;
this.sortKeySerializer = sortKeySerializer;
this.bufferFactory = bufferFactory;
- this.combineFactory = combineFactory;
this.rankStart = rankStart;
this.rankEnd = rankEnd;
this.outputRankNumber = outputRankNumber;
@@ -127,19 +123,16 @@ public final class WindowRankProcessor implements
SlicingWindowProcessor<Long> {
this.windowState =
new WindowMapState<>(
(InternalMapState<RowData, Long, RowData,
List<RowData>>) state);
- final WindowCombineFunction combineFunction =
- combineFactory.create(
- ctx.getRuntimeContext(),
- windowTimerService,
- ctx.getKeyedStateBackend(),
- windowState,
- true);
this.windowBuffer =
bufferFactory.create(
ctx.getOperatorOwner(),
ctx.getMemoryManager(),
ctx.getMemorySize(),
- combineFunction,
+ ctx.getRuntimeContext(),
+ windowTimerService,
+ ctx.getKeyedStateBackend(),
+ windowState,
+ true,
shiftTimeZone);
this.reuseOutput = new JoinedRowData();
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
similarity index 76%
rename from
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
rename to
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
index c12a247..d235ed5 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/WindowCombineFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/combines/RecordsCombiner.java
@@ -30,33 +30,29 @@ import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Iterator;
-/** The {@link WindowCombineFunction} is used to combine buffered data into
state. */
+/** The {@link RecordsCombiner} is used to combine buffered records into
state. */
@Internal
-public interface WindowCombineFunction {
-
+public interface RecordsCombiner {
/**
* Combines the buffered data into state based on the given window-key
pair.
*
* @param windowKey the window-key pair that the buffered data belong to,
the window-key object
* is reused.
- * @param value the buffered data, the iterator and {@link RowData}
objects are reused
+ * @param records the buffered data, the iterator and {@link RowData}
objects are reused.
*/
- void combine(WindowKey windowKey, Iterator<RowData> value) throws
Exception;
+ void combine(WindowKey windowKey, Iterator<RowData> records) throws
Exception;
/** Release resources allocated by this combine function. */
void close() throws Exception;
// ------------------------------------------------------------------------
- /**
- * A factory that creates a {@link WindowCombineFunction} for combining at
global stage, i.e.
- * keyed operator where combine into keyed state.
- */
+ /** A factory that creates a {@link RecordsCombiner}. */
@FunctionalInterface
interface Factory extends Serializable {
/**
- * Creates a {@link WindowCombineFunction} that can combine buffered
data into states.
+ * Creates a {@link RecordsCombiner} that can combine buffered data
into states.
*
* @param runtimeContext the current {@link RuntimeContext}
* @param timerService the service to register event-time and
processing-time timers
@@ -65,7 +61,7 @@ public interface WindowCombineFunction {
* @param isEventTime indicates whether the operator works in
event-time or processing-time
* mode, used for register corresponding timers.
*/
- WindowCombineFunction create(
+ RecordsCombiner createRecordsCombiner(
RuntimeContext runtimeContext,
WindowTimerService<Long> timerService,
KeyedStateBackend<RowData> stateBackend,
@@ -74,9 +70,10 @@ public interface WindowCombineFunction {
throws Exception;
}
- /** A factory that creates a {@link WindowCombineFunction} used for
combining at local stage. */
+ /** A factory that creates a {@link RecordsCombiner} used for combining at
local stage. */
+ @FunctionalInterface
interface LocalFactory extends Serializable {
- WindowCombineFunction create(RuntimeContext runtimeContext,
Collector<RowData> collector)
- throws Exception;
+ RecordsCombiner createRecordsCombiner(
+ RuntimeContext runtimeContext, Collector<RowData> collector)
throws Exception;
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
index 7929a26..e3cb25f 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
@@ -194,12 +194,12 @@ public abstract class AbstractBytesHashMap<K> extends
BytesMap<K, BinaryRowData>
/** Returns an iterator for iterating over the entries of this map. */
@SuppressWarnings("WeakerAccess")
- public KeyValueIterator<K, BinaryRowData> getEntryIterator() {
+ public KeyValueIterator<K, BinaryRowData> getEntryIterator(boolean
requiresCopy) {
if (destructiveIterator != null) {
throw new IllegalArgumentException(
"DestructiveIterator is not null, so this method can't be
invoke!");
}
- return ((RecordArea) recordArea).entryIterator();
+ return ((RecordArea) recordArea).entryIterator(requiresCopy);
}
/** @return the underlying memory segments of the hash map's record area */
@@ -329,8 +329,8 @@ public abstract class AbstractBytesHashMap<K> extends
BytesMap<K, BinaryRowData>
// ----------------------- Iterator -----------------------
- private KeyValueIterator<K, BinaryRowData> entryIterator() {
- return new EntryIterator();
+ private KeyValueIterator<K, BinaryRowData> entryIterator(boolean
requiresCopy) {
+ return new EntryIterator(requiresCopy);
}
private final class EntryIterator extends AbstractPagedInputView
@@ -338,10 +338,12 @@ public abstract class AbstractBytesHashMap<K> extends
BytesMap<K, BinaryRowData>
private int count = 0;
private int currentSegmentIndex = 0;
+ private final boolean requiresCopy;
- private EntryIterator() {
+ private EntryIterator(boolean requiresCopy) {
super(segments.get(0), segmentSize, 0);
destructiveIterator = this;
+ this.requiresCopy = requiresCopy;
}
@Override
@@ -358,12 +360,12 @@ public abstract class AbstractBytesHashMap<K> extends
BytesMap<K, BinaryRowData>
@Override
public K getKey() {
- return reusedKey;
+ return requiresCopy ? keySerializer.copy(reusedKey) :
reusedKey;
}
@Override
public BinaryRowData getValue() {
- return reusedValue;
+ return requiresCopy ? reusedValue.copy() : reusedValue;
}
public boolean hasNext() {
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
index cc64ef8..7ec8132 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesMultiMap.java
@@ -183,8 +183,8 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
}
}
- public KeyValueIterator<K, Iterator<RowData>> getEntryIterator() {
- return ((RecordArea) recordArea).entryIterator();
+ public KeyValueIterator<K, Iterator<RowData>> getEntryIterator(boolean
requiresCopy) {
+ return ((RecordArea) recordArea).entryIterator(requiresCopy);
}
/** release the map's record and bucket area's memory segments. */
@@ -329,14 +329,17 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
view.getCurrentSegment().putInt(currPosInSeg, newPointer);
}
- KeyValueIterator<K, Iterator<RowData>> entryIterator() {
- return new EntryIterator();
+ KeyValueIterator<K, Iterator<RowData>> entryIterator(boolean
requiresCopy) {
+ return new EntryIterator(requiresCopy);
}
final class EntryIterator implements KeyValueIterator<K,
Iterator<RowData>> {
private int count;
+ private final boolean requiresCopy;
- public EntryIterator() {
+ public EntryIterator(boolean requiresCopy) {
+ this.requiresCopy = requiresCopy;
+ reusedValueIterator.setRequiresCopy(requiresCopy);
count = 0;
if (numKeys > 0) {
recordArea.setReadPosition(0);
@@ -361,7 +364,7 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
@Override
public K getKey() {
- return reusedKey;
+ return requiresCopy ? keySerializer.copy(reusedKey) :
reusedKey;
}
@Override
@@ -382,10 +385,12 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
final class ValueIterator implements Iterator<RowData> {
private int offset;
private boolean isFirstRead;
+ private boolean requiresCopy;
public ValueIterator(int offset) {
this.offset = offset;
this.isFirstRead = true;
+ this.requiresCopy = false;
}
public void setOffset(int offset) {
@@ -393,6 +398,10 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
this.isFirstRead = true;
}
+ public void setRequiresCopy(boolean requiresCopy) {
+ this.requiresCopy = requiresCopy;
+ }
+
@Override
public boolean hasNext() {
return isFirstRead || offset != -1;
@@ -402,7 +411,7 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
public RowData next() {
if (isFirstRead) {
isFirstRead = false;
- return reusedRecord;
+ return requiresCopy ? reusedRecord.copy() : reusedRecord;
}
if (hasNext()) {
valInView.setReadPosition(offset);
@@ -415,7 +424,7 @@ public abstract class AbstractBytesMultiMap<K> extends
BytesMap<K, Iterator<RowD
"Exception happened while iterating"
+ " value list of a key in
BytesMultiMap");
}
- return reusedRecord;
+ return requiresCopy ? reusedRecord.copy() : reusedRecord;
}
return null;
}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
index 24e9a2d..88be5ef 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.java
@@ -158,7 +158,8 @@ public class SumHashAggTestOperator extends
AbstractStreamOperator<RowData>
if (sorter == null) {
// no spilling, output by iterating aggregate map.
- KeyValueIterator<BinaryRowData, BinaryRowData> iter =
aggregateMap.getEntryIterator();
+ KeyValueIterator<BinaryRowData, BinaryRowData> iter =
+ aggregateMap.getEntryIterator(false);
while (iter.advanceNext()) {
// set result and output
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index 44a3233..a599f29 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -253,7 +253,7 @@ public abstract class BytesHashMapTestBase<K> extends
BytesMapTestBase {
expected.add(entry.copy());
}
}
- KeyValueIterator<K, BinaryRowData> iter = table.getEntryIterator();
+ KeyValueIterator<K, BinaryRowData> iter =
table.getEntryIterator(false);
while (iter.advanceNext()) {
actualKeys.add(keySerializer.copy(iter.getKey()));
actualValues.add(iter.getValue().copy());
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
index 9a6620c..22dca2d 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java
@@ -116,7 +116,7 @@ public abstract class BytesMultiMapTestBase<K> extends
BytesMapTestBase {
}
}
- KeyValueIterator<K, Iterator<RowData>> iter = table.getEntryIterator();
+ KeyValueIterator<K, Iterator<RowData>> iter =
table.getEntryIterator(false);
while (iter.advanceNext()) {
int i = 0;
Iterator<RowData> valueIter = iter.getValue();