WencongLiu commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1497054471
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java:
##########
@@ -41,33 +41,45 @@ public final class OneInputTransformationTranslator<IN, OUT>
@Override
public Collection<Integer> translateForBatchInternal(
final OneInputTransformation<IN, OUT> transformation, final
Context context) {
- KeySelector<IN, ?> keySelector = transformation.getStateKeySelector();
Collection<Integer> ids =
translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
- keySelector,
+ transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);
- boolean isKeyed = keySelector != null;
- if (isKeyed) {
- BatchExecutionUtils.applyBatchExecutionSettings(
- transformation.getId(), context,
StreamConfig.InputRequirement.SORTED);
- }
+
+ maybeApplyBatchExecutionSettings(transformation, context);
return ids;
}
@Override
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation, final
Context context) {
- return translateInternal(
- transformation,
- transformation.getOperatorFactory(),
- transformation.getInputType(),
- transformation.getStateKeySelector(),
- transformation.getStateKeyType(),
- context);
+ Collection<Integer> ids =
+ translateInternal(
+ transformation,
+ transformation.getOperatorFactory(),
+ transformation.getInputType(),
+ transformation.getStateKeySelector(),
+ transformation.getStateKeyType(),
+ context);
+
+ if (transformation.isOutputOnlyAfterEndOfStream()) {
Review Comment:
There may be some operators defined on keyed stream that sort records
internally. Maybe we should add another attribute to identify that case.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -2303,6 +2307,102 @@ void
testOutputFormatSupportConcurrentExecutionAttempts() {
new TestingOutputFormatSupportConcurrentExecutionAttempts<>(),
true);
}
+ @Test
+ void testOutputOnlyAfterEndOfStream() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+
+ final DataStream<Integer> source = env.fromData(1, 2,
3).name("source");
+ source.keyBy(x -> x)
+ .transform(
+ "map",
+ Types.INT,
+ new StreamOperatorWithConfigurableOperatorAttributes<>(
+ x -> x,
+ new OperatorAttributesBuilder()
+ .setOutputOnlyAfterEndOfStream(true)
+ .build()))
+ .sinkTo(new DiscardingSink<>())
+ .name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph(false);
+ Map<String, StreamNode> nodeMap = new HashMap<>();
+ for (StreamNode node : streamGraph.getStreamNodes()) {
+ nodeMap.put(node.getOperatorName(), node);
+ }
+ assertThat(nodeMap).hasSize(3);
+ assertThat(nodeMap.get("Source:
source").isOutputOnlyAfterEndOfStream()).isFalse();
+ assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
+ assertThat(nodeMap.get("sink:
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+
+ assertThat(nodeMap.get("Source:
source").getManagedMemoryOperatorScopeUseCaseWeights())
+ .isEmpty();
+
assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1);
+ assertThat(nodeMap.get("sink:
Writer").getManagedMemoryOperatorScopeUseCaseWeights())
+ .isEmpty();
+
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ Map<String, JobVertex> vertexMap = new HashMap<>();
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ vertexMap.put(vertex.getName(), vertex);
+ }
+ assertThat(vertexMap).hasSize(2);
+ assertHasOutputPartitionType(
+ vertexMap.get("Source: source"),
ResultPartitionType.PIPELINED_BOUNDED);
Review Comment:
I think we should confirm that all edges connected from downstream nodes of
`StreamOperatorWithConfigurableOperatorAttributes` are `BLOCKING`. In this test
case, we only built a chain with two `StreamNode` and didn't check it. We'd
better test a chain with more nodes and check the `ResultPartitionType` of the
edges of downstream nodes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]