reswqa commented on code in PR #25276: URL: https://github.com/apache/flink/pull/25276#discussion_r1740311042
########## flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.attribute; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** {@link Attribute} contains the information about the process logic of a process function. */ +@Internal +public class Attribute implements Serializable { Review Comment: This is not a `Public` API, we should move it to `flink-core`. ########## flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.attribute; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** {@link Attribute} contains the information about the process logic of a process function. */ +@Internal +public class Attribute implements Serializable { + + private boolean isNoOutputUntilEndOfInput; + + private Attribute(Builder builder) { Review Comment: I don't think pass a builder to ctr is a good pattern. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java: ########## @@ -97,10 +98,46 @@ void testOutputOnlyAfterEndOfStream() { assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue(); assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testOutputOnlyAfterEndOfStreamCase2() { Review Comment: Can we merge this to the test case above and remove the `Case1` suffix? ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { Review Comment: `testPropagationAlongOperatorChain` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -206,7 +205,7 @@ public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process( } @Override - public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<V, OUT1, OUT2> process( Review Comment: We should do the same fix also for `TwoKeyedPartitionStreams`. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -697,20 +696,20 @@ private List<StreamEdge> createChain( } for (StreamEdge chainable : chainableOutputs) { - // Mark downstream nodes in the same chain as outputBlocking - if (isOutputOnlyAfterEndOfStream) { - outputBlockingNodesID.add(chainable.getTargetId()); + // Only modify the attribute of downstream nodes in the same chain. + if (isNoOutputUntilEndOfInput) { + StreamNode targetNode = streamGraph.getStreamNode(chainable.getTargetId()); + Attribute targetNodeAttribute = targetNode.getAttribute(); + if (targetNodeAttribute != null) { + targetNodeAttribute.setNoOutputUntilEndOfInput(true); + } } transitiveOutEdges.addAll( createChain( chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints)); - // Mark upstream nodes in the same chain as outputBlocking - if (outputBlockingNodesID.contains(chainable.getTargetId())) { Review Comment: We shouldn't remove it, instead, we should adapt it. We also lack tests for this case. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java: ########## @@ -19,8 +19,8 @@ package org.apache.flink.datastream.impl.stream; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; Review Comment: Why we have this change in commit2(this seems should be put at commit1)? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java: ########## @@ -191,17 +204,17 @@ private static class TwoGlobalStreamsImpl<OUT1, OUT2> implements TwoGlobalStream private final GlobalStreamImpl<OUT2> secondStream; - public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of( - GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) { - return new TwoGlobalStreamsImpl<>(firstStream, secondStream); - } - private TwoGlobalStreamsImpl( GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> secondStream) { this.firstStream = firstStream; this.secondStream = secondStream; } + public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of( Review Comment: What's the difference between this and Line 194 which you have removed? ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) Review Comment: Can we replace this with `DiscardSink`? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -282,7 +286,11 @@ public <T_OTHER, OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> connect processFunction, getType(), ((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType()); - + other = Review Comment: This is not the right line. We should move this after line 283. ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { Review Comment: This can be merged with the next test case. And we can change it to test `A -> B -> C` while B is `NoOutputUntilEndOfInput`. ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask2()) + .withParallelism(2) + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception { Review Comment: testTwoOutput ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask2()) + .withParallelism(2) + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Integer, Integer, Integer> + twoOutputStream = + source.process(new TestMapTask2()) + .withParallelism(2) + .process(new TestTwoOutputProcessFunction()) + .withParallelism(2); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> firstStream = + twoOutputStream.getFirst(); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> + secondStream = twoOutputStream.getSecond(); + firstStream + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + secondStream + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Process -> Two-Output-Operator -> (Process, Process)"), + ResultPartitionType.BLOCKING); + assertThat( + vertexMap + .get("Process -> Two-Output-Operator -> (Process, Process)") + .isAnyOutputBlocking()) + .isTrue(); + } + + @Test + void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(4); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), ResultPartitionType.BLOCKING); + assertHasOutputPartitionType( + vertexMap.get("Process"), ResultPartitionType.PIPELINED_BOUNDED); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + private void assertHasOutputPartitionType( + JobVertex jobVertex, ResultPartitionType partitionType) { + assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType); + } + + private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { + assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); + } + + @NoOutputUntilEndOfInput + private static class TestMapTask1 implements OneInputStreamProcessFunction<Integer, Integer> { Review Comment: Rename this to `NoOutputUntilEndOfInputMapTask` ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); Review Comment: These don't make sense and can be removed. ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask2()) + .withParallelism(2) + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Integer, Integer, Integer> + twoOutputStream = + source.process(new TestMapTask2()) + .withParallelism(2) + .process(new TestTwoOutputProcessFunction()) + .withParallelism(2); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> firstStream = + twoOutputStream.getFirst(); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> + secondStream = twoOutputStream.getSecond(); + firstStream + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + secondStream + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Process -> Two-Output-Operator -> (Process, Process)"), + ResultPartitionType.BLOCKING); + assertThat( + vertexMap + .get("Process -> Two-Output-Operator -> (Process, Process)") + .isAnyOutputBlocking()) + .isTrue(); + } + + @Test + void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(4); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), ResultPartitionType.BLOCKING); + assertHasOutputPartitionType( + vertexMap.get("Process"), ResultPartitionType.PIPELINED_BOUNDED); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + private void assertHasOutputPartitionType( + JobVertex jobVertex, ResultPartitionType partitionType) { + assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType); + } + + private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { + assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); + } + + @NoOutputUntilEndOfInput + private static class TestMapTask1 implements OneInputStreamProcessFunction<Integer, Integer> { + + @Override + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext ctx) { + output.collect(record + 1); + } + } + + private static class TestMapTask2 implements OneInputStreamProcessFunction<Integer, Integer> { Review Comment: Rename to `TestMapTask` ########## flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask1()) + .withParallelism(2) + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask2()) + .withParallelism(2) + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection Source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Integer, Integer, Integer> + twoOutputStream = + source.process(new TestMapTask2()) + .withParallelism(2) + .process(new TestTwoOutputProcessFunction()) + .withParallelism(2); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> firstStream = + twoOutputStream.getFirst(); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> + secondStream = twoOutputStream.getSecond(); + firstStream + .process(new TestMapTask1()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + secondStream + .process(new TestMapTask2()) + .withParallelism(2) + .toSink(new WrappedSink<>(new PrintSink())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Process -> Two-Output-Operator -> (Process, Process)"), + ResultPartitionType.BLOCKING); + assertThat( + vertexMap + .get("Process -> Two-Output-Operator -> (Process, Process)") + .isAnyOutputBlocking()) + .isTrue(); + } + + @Test + void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception { Review Comment: testWithoutOperatorChain -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
