WencongLiu commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1497054471


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java:
##########
@@ -41,33 +41,45 @@ public final class OneInputTransformationTranslator<IN, OUT>
     @Override
     public Collection<Integer> translateForBatchInternal(
             final OneInputTransformation<IN, OUT> transformation, final 
Context context) {
-        KeySelector<IN, ?> keySelector = transformation.getStateKeySelector();
         Collection<Integer> ids =
                 translateInternal(
                         transformation,
                         transformation.getOperatorFactory(),
                         transformation.getInputType(),
-                        keySelector,
+                        transformation.getStateKeySelector(),
                         transformation.getStateKeyType(),
                         context);
-        boolean isKeyed = keySelector != null;
-        if (isKeyed) {
-            BatchExecutionUtils.applyBatchExecutionSettings(
-                    transformation.getId(), context, 
StreamConfig.InputRequirement.SORTED);
-        }
+
+        maybeApplyBatchExecutionSettings(transformation, context);
 
         return ids;
     }
 
     @Override
     public Collection<Integer> translateForStreamingInternal(
             final OneInputTransformation<IN, OUT> transformation, final 
Context context) {
-        return translateInternal(
-                transformation,
-                transformation.getOperatorFactory(),
-                transformation.getInputType(),
-                transformation.getStateKeySelector(),
-                transformation.getStateKeyType(),
-                context);
+        Collection<Integer> ids =
+                translateInternal(
+                        transformation,
+                        transformation.getOperatorFactory(),
+                        transformation.getInputType(),
+                        transformation.getStateKeySelector(),
+                        transformation.getStateKeyType(),
+                        context);
+
+        if (transformation.isOutputOnlyAfterEndOfStream()) {

Review Comment:
   There may be some operators defined on keyed stream that sort records 
internally. Maybe we should add another attribute to identify that case.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -2303,6 +2307,102 @@ void 
testOutputFormatSupportConcurrentExecutionAttempts() {
                 new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), 
true);
     }
 
+    @Test
+    void testOutputOnlyAfterEndOfStream() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+
+        final DataStream<Integer> source = env.fromData(1, 2, 
3).name("source");
+        source.keyBy(x -> x)
+                .transform(
+                        "map",
+                        Types.INT,
+                        new StreamOperatorWithConfigurableOperatorAttributes<>(
+                                x -> x,
+                                new OperatorAttributesBuilder()
+                                        .setOutputOnlyAfterEndOfStream(true)
+                                        .build()))
+                .sinkTo(new DiscardingSink<>())
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph(false);
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(3);
+        assertThat(nodeMap.get("Source: 
source").isOutputOnlyAfterEndOfStream()).isFalse();
+        assertThat(nodeMap.get("map").isOutputOnlyAfterEndOfStream()).isTrue();
+        assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+
+        assertThat(nodeMap.get("Source: 
source").getManagedMemoryOperatorScopeUseCaseWeights())
+                .isEmpty();
+        
assertThat(nodeMap.get("map").getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(1);
+        assertThat(nodeMap.get("sink: 
Writer").getManagedMemoryOperatorScopeUseCaseWeights())
+                .isEmpty();
+
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(2);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: source"), 
ResultPartitionType.PIPELINED_BOUNDED);

Review Comment:
   I think we should confirm that all edges connected from downstream nodes of 
`StreamOperatorWithConfigurableOperatorAttributes` are `BLOCKING`. In this test 
case, we only built a chain with two `StreamNode` and didn't check it. We'd 
better test a chain with more nodes and check the `ResultPartitionType` of the 
edges of downstream nodes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to