lindong28 commented on code in PR #23000:
URL: https://github.com/apache/flink/pull/23000#discussion_r1300982941
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -581,6 +581,14 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
CompletableFuture<Void> masterTriggerCompletionPromise = new
CompletableFuture<>();
+ if (checkpointPlanFuture.get() == null) {
Review Comment:
It seems unnecessary to keep calling `startTriggeringCheckpoint` when there
is any task writing data to a blocking-typed partition.
And it might be useful to immediately trigger a checkpoint right after it is
allowed (e.g. no task is writing data to a blocking-typed partition) AND there
is been more than `execution.checkpointing.interval` since the job is started.
Maybe the JM should schedule checkpoint coordinator as appropriate based on
the task status changes?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java:
##########
@@ -156,6 +156,13 @@ private void checkTasksStarted(List<Execution> toTrigger)
throws CheckpointExcep
* @return The plan of this checkpoint.
*/
private CheckpointPlan calculateWithAllTasksRunning() {
+ // Do not trigger checkpoint before all BATCH tasks have finished.
+ for (ExecutionVertex task : allTasks) {
+ if (!task.allOutputsPipelinedOrPipelinedBounded()) {
Review Comment:
In addition to ensuring that there is no task producing to block-typed
partition, do we also need to ensure that there is no task consuming from
block-typed partition?
Maybe we can let tasks report this `end-of-data-from-blocking-inputs`
similar to how tasks report `end-of-data`. You can see
`TaskExecutor#notifyEndOfData` for more information.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java:
##########
@@ -328,6 +329,12 @@ public <ACC, R> SingleOutputStreamOperator<R> aggregate(
"This aggregation function cannot be a RichFunction.");
}
+ if (builder.isEndOfStreamWindow()) {
Review Comment:
Instead of adding this extra API to read an attribute of
`WindowOperatorBuilder`, would it be simpler to let `builder.aggregate()`
return the `EOFAggregationOperator`?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java:
##########
@@ -442,4 +442,11 @@ public void setSupportsConcurrentExecutionAttempts(
boolean supportsConcurrentExecutionAttempts) {
this.supportsConcurrentExecutionAttempts =
supportsConcurrentExecutionAttempts;
}
+
+ public boolean isOutputOnEOF() {
+ if (operatorFactory == null) {
Review Comment:
`operatorFactory` is not marked `@Nullable` in `StreamNode`. I suppose it
should be impossible to have a StreamNode with `operatorFactory == null`.
Can you explain when we need this check?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java:
##########
@@ -491,6 +494,55 @@ public void testCoFeedbackThrowsExceptionInBatch() {
testNoSupportForIterationsInBatchHelper(coFeedbackTransformation);
}
+ @Test
+ public void operatorSortWithoutRepetitionTest() {
Review Comment:
Typically we expect the test method name to start with `test...` and shows
on the key logic/API that this test covers.
How about naming it something like `testInternalSorterAndOutputOnEOF`?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##########
@@ -960,6 +962,51 @@ public void testCacheSideOutput() {
verifyCacheConsumeNode(env, upstreamParallelism, cacheTransformation);
}
+ @Test
+ public void testTransformOutputEOFOperator() {
Review Comment:
It seems that this test aims to check "if an operator has isOutputOnEOF =
true" and has keyed inputs, then its input should be sorted.
It is a bit weird/redundant to have this test since there is no logical
connection between input requirement and isOutputOnEOF's value.
Note that it is in general impossible and unnecessary to test every
combination of configurations. We typically only add tests whose input/output
has logical connections.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java:
##########
@@ -366,6 +368,19 @@ public <T> DataStream<T> apply(
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
+ // Using EOFCoGroupOperator when the window assigner is
EndOfStreamWindows
+ if (windowAssigner instanceof EndOfStreamWindows) {
Review Comment:
Should we use `EOFCoGroupOperator` only when both `trigger` and `evictor`
are null? Otherwise, we might produce wrong results when user provides
trigger/evictor.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1108,7 +1128,13 @@ private void setOperatorConfig(
config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
- final CheckpointConfig checkpointCfg =
streamGraph.getCheckpointConfig();
+ final CheckpointConfig checkpointCfg;
+ if (outputBlockingNodesID.contains(vertexId)) {
+ // the output blocking vertex should disable checkpoint.
+ checkpointCfg = new CheckpointConfig();
Review Comment:
It is not obvious that `new CheckpointConfig` means "checkpoint is disabled".
It might be simpler and more readable to update
`config.setCheckpointingEnabled(...)` in the following code.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -703,6 +716,13 @@ private List<StreamEdge> createChain(
chainEntryPoints);
}
+ for (StreamEdge edge : currentNode.getOutEdges()) {
+ // transfer outputBlocking from all of downstream nodes
+ if (outputBlockingNodesID.contains(edge.getTargetId())) {
+ outputBlockingNodesID.add(currentNodeId);
Review Comment:
Suppose we have three nodes A, B, and C that are chained in order. Node C
has `isOutputOnEOF = true`. The current PR ensures that nodes B and C will be
added to `outputBlockingNodesID`.
Will node A also be added to `outputBlockingNodesID`?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java:
##########
@@ -328,6 +329,12 @@ public <ACC, R> SingleOutputStreamOperator<R> aggregate(
"This aggregation function cannot be a RichFunction.");
}
+ if (builder.isEndOfStreamWindow()) {
+ function = input.getExecutionEnvironment().clean(function);
+ EOFAggregationOperator<T, K, ACC, R> operator = new
EOFAggregationOperator<>(function);
+ return input.transform("EOFAggregation", resultType, operator);
Review Comment:
Instead of hardcoding `EOFAggregation` as the operator name, would it be
simpler and more consistent with the existing code to use
`builder.generateOperatorName()` as the operator name?
Similarly, it seems more consistent to use
`builder.generateOperatorDescription()` as the operator description.
In general, unless there is clear benefit of changing behavior, it is useful
to keep unchanged and re-use existing code as much as possible.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -703,6 +716,13 @@ private List<StreamEdge> createChain(
chainEntryPoints);
}
+ for (StreamEdge edge : currentNode.getOutEdges()) {
Review Comment:
Suppose we have three nodes A, B, and C that are chained in order. Node A
has `isOutputOnEOF = true`. The current PR ensures that nodes A and B will be
added to `outputBlockingNodesID`.
Will node C also be added to `outputBlockingNodesID`?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/EOFAggregationOperator.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * This operator would replace the regular Aggregate process when the
WindowAssigner is
+ * EndOfStreamWindows.
+ */
+@Internal
+public class EOFAggregationOperator<IN, KEY, ACC, OUT>
+ extends AbstractUdfStreamOperator<OUT, AggregateFunction<IN, ACC, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, BoundedMultiInput {
+
+ private PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
+ TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> keyAndValueSerializer;
+ TypeComparator<Tuple2<byte[], StreamRecord<IN>>> comparator;
+ private KeySelector<IN, KEY> keySelector;
+ private TypeSerializer<KEY> keySerializer;
+ private DataOutputSerializer dataOutputSerializer;
+ private long lastWatermarkTimestamp = Long.MIN_VALUE;
+ private int remainingInputNum = 1;
+ private transient OperatorAttributes operatorAttributes;
+
+ public EOFAggregationOperator(AggregateFunction<IN, ACC, OUT>
userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+ ClassLoader userCodeClassLoader =
containingTask.getUserCodeClassLoader();
+ MemoryManager memoryManager =
containingTask.getEnvironment().getMemoryManager();
+ IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+ keySelector = (KeySelector<IN, KEY>) config.getStatePartitioner(0,
userCodeClassLoader);
+ keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+ int keyLength = keySerializer.getLength();
+
+ TypeSerializer<IN> typeSerializerA = config.getTypeSerializerIn(0,
userCodeClassLoader);
+ keyAndValueSerializer = new KeyAndValueSerializer<>(typeSerializerA,
keyLength);
+
+ if (keyLength > 0) {
+ dataOutputSerializer = new DataOutputSerializer(keyLength);
+ comparator = new FixedLengthByteKeyComparator<>(keyLength);
+ } else {
+ dataOutputSerializer = new DataOutputSerializer(64);
+ comparator = new VariableLengthByteKeyComparator<>();
+ }
+
+ ExecutionConfig executionConfig =
containingTask.getEnvironment().getExecutionConfig();
+
+ double managedMemoryFraction =
+ config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ containingTask.getEnvironment().getTaskConfiguration(),
+ userCodeClassLoader);
+
+ Configuration jobConfiguration =
containingTask.getEnvironment().getJobConfiguration();
+
+ try {
+ sorter =
+ ExternalSorter.newBuilder(
+ memoryManager,
+ containingTask,
+ keyAndValueSerializer,
+ comparator,
+ executionConfig)
+ .memoryFraction(managedMemoryFraction)
+ .enableSpilling(
+ ioManager,
+
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+ .maxNumFileHandles(
+
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 2)
+
.objectReuse(executionConfig.isObjectReuseEnabled())
+ .largeRecords(
+ jobConfiguration.get(
+
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
+ .build();
+ } catch (MemoryAllocationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void endInput(int inputId) throws Exception {
+ if (inputId == 1) {
+ sorter.finishReading();
+ remainingInputNum--;
+ } else {
+ throw new RuntimeException("Unknown inputId " + inputId);
+ }
+
+ if (remainingInputNum > 0) {
Review Comment:
If it is impossible to have `remainingInputNum > 0` here, it would be
simpler to remove this check.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java:
##########
@@ -50,24 +50,47 @@ public Collection<Integer> translateForBatchInternal(
keySelector,
transformation.getStateKeyType(),
context);
- boolean isKeyed = keySelector != null;
- if (isKeyed) {
- BatchExecutionUtils.applyBatchExecutionSettings(
- transformation.getId(), context,
StreamConfig.InputRequirement.SORTED);
- }
+
+ oneInputTransformationOutputEOFSetting(transformation, context);
return ids;
}
@Override
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation, final
Context context) {
- return translateInternal(
- transformation,
- transformation.getOperatorFactory(),
- transformation.getInputType(),
- transformation.getStateKeySelector(),
- transformation.getStateKeyType(),
- context);
+ Collection<Integer> ids =
+ translateInternal(
+ transformation,
+ transformation.getOperatorFactory(),
+ transformation.getInputType(),
+ transformation.getStateKeySelector(),
+ transformation.getStateKeyType(),
+ context);
+
+ if (transformation.isOutputOnEOF()) {
+ // Try to apply batch execution settings for streaming mode
transformation.
+ oneInputTransformationOutputEOFSetting(transformation, context);
+ }
+
+ return ids;
+ }
+
+ private void oneInputTransformationOutputEOFSetting(
Review Comment:
It might be a bit confusing to have `OutputEOF` in the method's name because
this method is invoked under batch mode even if the operator does not have
`isOutputOnEOF = true`.
How about naming this method `maybeApplyBatchExecutionSettings`?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java:
##########
@@ -225,7 +225,8 @@ private StreamNode findWriter(StreamGraph streamGraph) {
}
private StreamNode findCommitter(StreamGraph streamGraph) {
- return findNodeName(streamGraph, name -> name.contains("Committer"));
+ return findNodeName(
Review Comment:
Can you explain why we need this change?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/EOFAggregationOperator.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * This operator would replace the regular Aggregate process when the
WindowAssigner is
+ * EndOfStreamWindows.
+ */
+@Internal
+public class EOFAggregationOperator<IN, KEY, ACC, OUT>
+ extends AbstractUdfStreamOperator<OUT, AggregateFunction<IN, ACC, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, BoundedMultiInput {
+
+ private PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
+ TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> keyAndValueSerializer;
+ TypeComparator<Tuple2<byte[], StreamRecord<IN>>> comparator;
+ private KeySelector<IN, KEY> keySelector;
+ private TypeSerializer<KEY> keySerializer;
+ private DataOutputSerializer dataOutputSerializer;
+ private long lastWatermarkTimestamp = Long.MIN_VALUE;
+ private int remainingInputNum = 1;
+ private transient OperatorAttributes operatorAttributes;
+
+ public EOFAggregationOperator(AggregateFunction<IN, ACC, OUT>
userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+ ClassLoader userCodeClassLoader =
containingTask.getUserCodeClassLoader();
+ MemoryManager memoryManager =
containingTask.getEnvironment().getMemoryManager();
+ IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+ keySelector = (KeySelector<IN, KEY>) config.getStatePartitioner(0,
userCodeClassLoader);
+ keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+ int keyLength = keySerializer.getLength();
+
+ TypeSerializer<IN> typeSerializerA = config.getTypeSerializerIn(0,
userCodeClassLoader);
+ keyAndValueSerializer = new KeyAndValueSerializer<>(typeSerializerA,
keyLength);
+
+ if (keyLength > 0) {
+ dataOutputSerializer = new DataOutputSerializer(keyLength);
+ comparator = new FixedLengthByteKeyComparator<>(keyLength);
+ } else {
+ dataOutputSerializer = new DataOutputSerializer(64);
+ comparator = new VariableLengthByteKeyComparator<>();
+ }
+
+ ExecutionConfig executionConfig =
containingTask.getEnvironment().getExecutionConfig();
+
+ double managedMemoryFraction =
+ config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ containingTask.getEnvironment().getTaskConfiguration(),
+ userCodeClassLoader);
+
+ Configuration jobConfiguration =
containingTask.getEnvironment().getJobConfiguration();
+
+ try {
+ sorter =
+ ExternalSorter.newBuilder(
+ memoryManager,
+ containingTask,
+ keyAndValueSerializer,
+ comparator,
+ executionConfig)
+ .memoryFraction(managedMemoryFraction)
+ .enableSpilling(
+ ioManager,
+
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+ .maxNumFileHandles(
+
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 2)
Review Comment:
Given that there is only one sorter used in this class, we might not need to
divide it by 2.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1472,15 +1498,35 @@ private ResultPartitionType
getResultPartitionType(StreamEdge edge) {
case HYBRID_SELECTIVE:
return ResultPartitionType.HYBRID_SELECTIVE;
case UNDEFINED:
- return
determineUndefinedResultPartitionType(edge.getPartitioner());
+ return determineUndefinedResultPartitionType(edge);
default:
throw new UnsupportedOperationException(
"Data exchange mode " + edge.getExchangeMode() + " is
not supported yet.");
}
}
- private ResultPartitionType determineUndefinedResultPartitionType(
- StreamPartitioner<?> partitioner) {
+ private ResultPartitionType
determineUndefinedResultPartitionType(StreamEdge edge) {
+ if (outputBlockingNodesID.contains(edge.getSourceId())) {
+ // use corresponding partition type if the upstream node outputs
EOF
+
edge.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
Review Comment:
Suppose operatorA and operatorB is not chained, operatorA sends records to
operatorB, and operatorB has outputOnEOF = true, and execution mode is
streaming. Will this PR guarantee that the edge between operatorA and operatorB
is blocking (or hybrid shuffle), and operatorA runs in batch mode?
If yes, can you provide reference to the code that achieves this? The code
in this class seems to only guarantee that those operators chained with an
operator with isOutputOnEOF = true will run in batch mode.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/EOFAggregationOperator.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * This operator would replace the regular Aggregate process when the
WindowAssigner is
+ * EndOfStreamWindows.
+ */
+@Internal
+public class EOFAggregationOperator<IN, KEY, ACC, OUT>
+ extends AbstractUdfStreamOperator<OUT, AggregateFunction<IN, ACC, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, BoundedMultiInput {
+
+ private PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
+ TypeSerializer<Tuple2<byte[], StreamRecord<IN>>> keyAndValueSerializer;
+ TypeComparator<Tuple2<byte[], StreamRecord<IN>>> comparator;
+ private KeySelector<IN, KEY> keySelector;
+ private TypeSerializer<KEY> keySerializer;
+ private DataOutputSerializer dataOutputSerializer;
+ private long lastWatermarkTimestamp = Long.MIN_VALUE;
+ private int remainingInputNum = 1;
+ private transient OperatorAttributes operatorAttributes;
+
+ public EOFAggregationOperator(AggregateFunction<IN, ACC, OUT>
userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+ ClassLoader userCodeClassLoader =
containingTask.getUserCodeClassLoader();
+ MemoryManager memoryManager =
containingTask.getEnvironment().getMemoryManager();
+ IOManager ioManager = containingTask.getEnvironment().getIOManager();
+
+ keySelector = (KeySelector<IN, KEY>) config.getStatePartitioner(0,
userCodeClassLoader);
+ keySerializer = config.getStateKeySerializer(userCodeClassLoader);
+ int keyLength = keySerializer.getLength();
+
+ TypeSerializer<IN> typeSerializerA = config.getTypeSerializerIn(0,
userCodeClassLoader);
+ keyAndValueSerializer = new KeyAndValueSerializer<>(typeSerializerA,
keyLength);
+
+ if (keyLength > 0) {
+ dataOutputSerializer = new DataOutputSerializer(keyLength);
+ comparator = new FixedLengthByteKeyComparator<>(keyLength);
+ } else {
+ dataOutputSerializer = new DataOutputSerializer(64);
+ comparator = new VariableLengthByteKeyComparator<>();
+ }
+
+ ExecutionConfig executionConfig =
containingTask.getEnvironment().getExecutionConfig();
+
+ double managedMemoryFraction =
+ config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ containingTask.getEnvironment().getTaskConfiguration(),
+ userCodeClassLoader);
+
+ Configuration jobConfiguration =
containingTask.getEnvironment().getJobConfiguration();
+
+ try {
+ sorter =
+ ExternalSorter.newBuilder(
+ memoryManager,
+ containingTask,
+ keyAndValueSerializer,
+ comparator,
+ executionConfig)
+ .memoryFraction(managedMemoryFraction)
+ .enableSpilling(
+ ioManager,
+
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+ .maxNumFileHandles(
+
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 2)
+
.objectReuse(executionConfig.isObjectReuseEnabled())
+ .largeRecords(
+ jobConfiguration.get(
+
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
+ .build();
+ } catch (MemoryAllocationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void endInput(int inputId) throws Exception {
+ if (inputId == 1) {
+ sorter.finishReading();
+ remainingInputNum--;
+ } else {
+ throw new RuntimeException("Unknown inputId " + inputId);
+ }
+
+ if (remainingInputNum > 0) {
+ return;
+ }
+
+ MutableObjectIterator<Tuple2<byte[], StreamRecord<IN>>> iterator =
sorter.getIterator();
+ TimestampedCollector<OUT> outputCollector = new
TimestampedCollector<>(output);
+ NonReusingKeyGroupedIterator<Tuple2<byte[], StreamRecord<IN>>>
keyGroupedIterator =
+ new NonReusingKeyGroupedIterator<>(iterator, comparator);
+
+ while (keyGroupedIterator.nextKey()) {
+ ACC acc = userFunction.createAccumulator();
+ NonReusingKeyGroupedIterator<Tuple2<byte[],
StreamRecord<IN>>>.ValuesIterator records =
+ keyGroupedIterator.getValues();
+ for (Tuple2<byte[], StreamRecord<IN>> record : records) {
+ acc = userFunction.add(record.f1.getValue(), acc);
+ }
+ outputCollector.collect(userFunction.getResult(acc));
+ }
+
+ Watermark watermark = new Watermark(lastWatermarkTimestamp);
+ if (getTimeServiceManager().isPresent()) {
+ getTimeServiceManager().get().advanceWatermark(watermark);
+ }
+ output.emitWatermark(watermark);
+ }
+
+ @Override
+ public void processWatermark(Watermark watermark) throws Exception {
+ if (lastWatermarkTimestamp > watermark.getTimestamp()) {
+ throw new RuntimeException("Invalid watermark");
+ }
+ lastWatermarkTimestamp = watermark.getTimestamp();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ sorter.close();
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> streamRecord) throws Exception
{
+ KEY key = keySelector.getKey(streamRecord.getValue());
+ keySerializer.serialize(key, dataOutputSerializer);
+ byte[] serializedKey = dataOutputSerializer.getCopyOfBuffer();
+ dataOutputSerializer.clear();
+ sorter.writeRecord(Tuple2.of(serializedKey, streamRecord));
+ }
+
+ @Override
+ public OperatorAttributes getOperatorAttributes() {
Review Comment:
It might be simpler to just instantiate and return `OperatorAttributes`
without caching it. The reason is that JM should not have to repeatedly invoke
`getOperatorAttributes()` for the same operator instance.
And JM does need to repeatedly invoke `getOperatorAttributes()` for the same
instance, it might be simpler to cache it in Flink infra (e.g. JM,
OneInputTransformation, AbstractMultipleInputTransformation) instead of having
each operator (which is written by operator developer) maintaining this cache.
Same for `EOFCoGroupOperator`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]