This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4d131705b19d06f55f94cc6e19728488fa5448f1 Author: codenohup <[email protected]> AuthorDate: Wed Sep 11 16:44:15 2024 +0800 [FLINK-35795][API] Introduce the framework of ProcessFunction Attribute --- .../flink/api/common/attribute/Attribute.java | 57 +++++ .../org/apache/flink/api/dag/Transformation.java | 11 + .../api/attribute/NoOutputUntilEndOfInput.java | 36 +++ .../datastream/impl/attribute/AttributeParser.java | 36 +++ .../impl/stream/BroadcastStreamImpl.java | 4 + .../datastream/impl/stream/GlobalStreamImpl.java | 21 +- .../impl/stream/KeyedPartitionStreamImpl.java | 21 +- .../impl/stream/NonKeyedPartitionStreamImpl.java | 5 + ...treamingJobGraphGeneratorWithAttributeTest.java | 244 +++++++++++++++++++++ .../flink/streaming/api/graph/StreamConfig.java | 18 ++ .../flink/streaming/api/graph/StreamGraph.java | 7 + .../flink/streaming/api/graph/StreamNode.java | 11 + .../api/graph/StreamingJobGraphGenerator.java | 35 +-- .../AbstractOneInputTransformationTranslator.java | 2 +- .../AbstractTwoInputTransformationTranslator.java | 2 +- .../MultiInputTransformationTranslator.java | 2 +- .../OneInputTransformationTranslator.java | 3 +- ...obGraphGeneratorWithOperatorAttributesTest.java | 101 ++++++++- 18 files changed, 581 insertions(+), 35 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java b/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java new file mode 100644 index 00000000000..3812a6c6e51 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.attribute; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** {@link Attribute} contains the information about the process logic of a process function. */ +@Internal +public class Attribute implements Serializable { + + private boolean isNoOutputUntilEndOfInput; + + private Attribute(boolean isNoOutputUntilEndOfInput) { + this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput; + } + + public boolean isNoOutputUntilEndOfInput() { + return isNoOutputUntilEndOfInput; + } + + public void setNoOutputUntilEndOfInput(boolean noOutputUntilEndOfInput) { + isNoOutputUntilEndOfInput = noOutputUntilEndOfInput; + } + + @Internal + public static class Builder { + + private boolean isNoOutputUntilEndOfInput = false; + + public Builder setNoOutputUntilEndOfInput(boolean isNoOutputUntilEndOfInput) { + this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput; + return this; + } + + public Attribute build() { + return new Attribute(isNoOutputUntilEndOfInput); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index b38f52c20df..804638c30fd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -21,6 +21,7 @@ package org.apache.flink.api.dag; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.SlotSharingGroup; @@ -188,6 +189,8 @@ public abstract class Transformation<T> { @Nullable private String coLocationGroupKey; + private Attribute attribute = new Attribute.Builder().build(); + /** * Creates a new {@code Transformation} with the given name, output type and parallelism. * @@ -649,4 +652,12 @@ public abstract class Transformation<T> { public int hashCode() { return Objects.hash(id, name, outputType, parallelism, bufferTimeout); } + + public void setAttribute(Attribute attribute) { + this.attribute = attribute; + } + + public Attribute getAttribute() { + return attribute; + } } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java new file mode 100644 index 00000000000..63adbfa4b6f --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.attribute; + +import org.apache.flink.annotation.Experimental; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * {@link NoOutputUntilEndOfInput} indicates that the process function will only output records + * after all inputs are ended. If this annotation is applied to a process function with an unbounded + * source, a compilation error will occur. + */ +@Experimental +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface NoOutputUntilEndOfInput {} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java new file mode 100644 index 00000000000..cfc1d21f101 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.common.attribute.Attribute; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.function.ProcessFunction; + +/** {@link AttributeParser} is used to parse {@link Attribute} from {@link ProcessFunction}. */ +public class AttributeParser { + + public static Attribute parseAttribute(ProcessFunction function) { + Class<? extends ProcessFunction> functionClass = function.getClass(); + Attribute.Builder attributeBuilder = new Attribute.Builder(); + if (functionClass.isAnnotationPresent(NoOutputUntilEndOfInput.class)) { + attributeBuilder.setNoOutputUntilEndOfInput(true); + } + return attributeBuilder.build(); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java index defd870be68..8b02eabdaf3 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfig import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.utils.StreamUtils; @@ -77,6 +78,7 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro this, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -105,6 +107,7 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro this, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -133,6 +136,7 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro NonKeyedPartitionStreamImpl<OUT> outputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java index 8dddc935318..137917bca4f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.datastream.impl.stream; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.dsv2.Sink; @@ -33,6 +34,7 @@ import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.ProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoOutputProcessOperator; @@ -69,7 +71,12 @@ public class GlobalStreamImpl<T> extends AbstractDataStream<T> implements Global TypeInformation<OUT> outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction); - return StreamUtils.wrapWithConfigureHandle(transform("Global Process", outType, operator)); + return StreamUtils.wrapWithConfigureHandle( + transform( + "Global Process", + outType, + operator, + AttributeParser.parseAttribute(processFunction))); } @Override @@ -89,7 +96,11 @@ public class GlobalStreamImpl<T> extends AbstractDataStream<T> implements Global TwoOutputProcessOperator<T, OUT1, OUT2> operator = new TwoOutputProcessOperator<>(processFunction, secondOutputTag); GlobalStreamImpl<OUT1> firstStream = - transform("Two-Output-Operator", firstOutputType, operator); + transform( + "Two-Output-Operator", + firstOutputType, + operator, + AttributeParser.parseAttribute(processFunction)); GlobalStreamImpl<OUT2> secondStream = new GlobalStreamImpl<>( environment, firstStream.getSideOutputTransform(secondOutputTag)); @@ -122,6 +133,7 @@ public class GlobalStreamImpl<T> extends AbstractDataStream<T> implements Global // Operator parallelism should always be 1 for global stream. // parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler. outTransformation.setParallelism(1, true); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new GlobalStreamImpl<>(environment, outTransformation)); @@ -162,7 +174,8 @@ public class GlobalStreamImpl<T> extends AbstractDataStream<T> implements Global private <R> GlobalStreamImpl<R> transform( String operatorName, TypeInformation<R> outputTypeInfo, - OneInputStreamOperator<T, R> operator) { + OneInputStreamOperator<T, R> operator, + Attribute attribute) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); @@ -177,7 +190,7 @@ public class GlobalStreamImpl<T> extends AbstractDataStream<T> implements Global // parallelismConfigured should be true to avoid overwritten by // AdaptiveBatchScheduler. true); - + resultTransform.setAttribute(attribute); GlobalStreamImpl<R> returnStream = new GlobalStreamImpl<>(environment, resultTransform); environment.addOperator(resultTransform); diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index d9650020478..6fca168b426 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -36,6 +36,7 @@ import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.KeyedProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator; @@ -118,6 +119,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> Transformation<OUT> transform = StreamUtils.getOneInputKeyedTransformation( "KeyedProcess", this, outType, operator, keySelector, keyType); + transform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(transform); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, transform)); @@ -141,6 +143,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> "KeyedProcess", this, outType, operator, keySelector, keyType); NonKeyedPartitionStreamImpl<OUT> outputStream = new NonKeyedPartitionStreamImpl<>(environment, transform); + transform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(transform); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( @@ -178,6 +181,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> operator, keySelector, keyType); + mainOutputTransform.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl<OUT1> nonKeyedMainOutputStream = new NonKeyedPartitionStreamImpl<>(environment, mainOutputTransform); Transformation<OUT2> sideOutputTransform = @@ -229,6 +233,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> operator, keySelector, keyType); + firstTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl<OUT1> firstStream = new NonKeyedPartitionStreamImpl<>(environment, firstTransformation); NonKeyedPartitionStreamImpl<OUT2> secondStream = @@ -267,6 +272,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> (KeyedPartitionStreamImpl<K, T_OTHER>) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -282,16 +288,18 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> new HashSet<>( Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL))); - TypeInformation<OUT> outTypeInfo = - StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction( - processFunction, - getType(), - ((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType()); other = other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) .getKeyedPartitionStream() : other; + + TypeInformation<OUT> outTypeInfo = + StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction( + processFunction, + getType(), + ((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType()); + KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> processOperator = new KeyedTwoInputNonBroadcastProcessOperator<>(processFunction, newKeySelector); Transformation<OUT> outTransformation = @@ -301,6 +309,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> (KeyedPartitionStreamImpl<K, T_OTHER>) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl<OUT> nonKeyedOutputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); environment.addOperator(outTransformation); @@ -336,6 +345,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> (BroadcastStreamImpl<T_OTHER>) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -368,6 +378,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> NonKeyedPartitionStreamImpl<OUT> outputStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); // Construct a keyed stream directly without partitionTransformation to avoid shuffle. return StreamUtils.wrapWithConfigureHandle( diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java index 82b81147f71..ea747f8c898 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java @@ -34,6 +34,7 @@ import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.attribute.AttributeParser; import org.apache.flink.datastream.impl.operators.ProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator; @@ -75,6 +76,7 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction); OneInputTransformation<T, OUT> outputTransform = StreamUtils.getOneInputTransformation("Process", this, outType, operator); + outputTransform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outputTransform); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outputTransform)); @@ -101,6 +103,7 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> OneInputTransformation<T, OUT1> outTransformation = StreamUtils.getOneInputTransformation( "Two-Output-Operator", this, firstOutputType, operator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); NonKeyedPartitionStreamImpl<OUT1> firstStream = new NonKeyedPartitionStreamImpl<>(environment, outTransformation); NonKeyedPartitionStreamImpl<OUT2> secondStream = @@ -141,6 +144,7 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> (NonKeyedPartitionStreamImpl<T_OTHER>) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); @@ -170,6 +174,7 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> (BroadcastStreamImpl<T_OTHER>) other, outTypeInfo, processOperator); + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, outTransformation)); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java new file mode 100644 index 00000000000..73a79dd3d9f --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.attribute; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */ +class StreamingJobGraphGeneratorWithAttributeTest { + + @Test + void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new NoOutputUntilEndOfInputMapTask()) + .withParallelism(2) + .process(new TestMapTask()) + .withParallelism(2) + .toSink(new WrappedSink<>(new DiscardingSink<>())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testPropagateAlongOperatorChain() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new TestMapTask()) + .withParallelism(2) + .process(new NoOutputUntilEndOfInputMapTask()) + .withParallelism(2) + .toSink(new WrappedSink<>(new DiscardingSink<>())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("KeyedProcess -> Process"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess -> Process").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testTwoOutput() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer, Integer> + twoOutputStream = + source.process(new TestMapTask()) + .withParallelism(2) + .process(new TestTwoOutputProcessFunction()) + .withParallelism(2); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> firstStream = + twoOutputStream.getFirst(); + NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> + secondStream = twoOutputStream.getSecond(); + firstStream + .process(new NoOutputUntilEndOfInputMapTask()) + .withParallelism(2) + .toSink(new WrappedSink<>(new DiscardingSink<>())) + .withParallelism(3); + secondStream + .process(new TestMapTask()) + .withParallelism(2) + .toSink(new WrappedSink<>(new DiscardingSink<>())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Process -> Two-Output-Operator -> (Process, Process)"), + ResultPartitionType.BLOCKING); + assertThat( + vertexMap + .get("Process -> Two-Output-Operator -> (Process, Process)") + .isAnyOutputBlocking()) + .isTrue(); + } + + @Test + void testWithoutOperatorChain() throws Exception { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + NonKeyedPartitionStream<Integer> source = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 3)), "test-source"); + source.keyBy(x -> x) + .process(new NoOutputUntilEndOfInputMapTask()) + .withParallelism(2) + .process(new TestMapTask()) + .withParallelism(2) + .toSink(new WrappedSink<>(new DiscardingSink<>())) + .withParallelism(3); + StreamGraph streamGraph = env.getStreamGraph(); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(4); + assertHasOutputPartitionType( + vertexMap.get("Source: Collection Source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), ResultPartitionType.BLOCKING); + assertHasOutputPartitionType( + vertexMap.get("Process"), ResultPartitionType.PIPELINED_BOUNDED); + assertThat(vertexMap.get("Source: Collection Source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("Sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + private void assertHasOutputPartitionType( + JobVertex jobVertex, ResultPartitionType partitionType) { + assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType); + } + + @NoOutputUntilEndOfInput + private static class NoOutputUntilEndOfInputMapTask + implements OneInputStreamProcessFunction<Integer, Integer> { + + @Override + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext ctx) { + output.collect(record + 1); + } + } + + private static class TestMapTask implements OneInputStreamProcessFunction<Integer, Integer> { + + @Override + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext ctx) { + if (record != 2) { + output.collect(record + 1); + } + } + } + + /** The test {@link TwoOutputStreamProcessFunction}. */ + private static class TestTwoOutputProcessFunction + implements TwoOutputStreamProcessFunction<Integer, Integer, Integer> { + + @Override + public void processRecord( + Integer record, + Collector<Integer> output1, + Collector<Integer> output2, + PartitionedContext ctx) { + output1.collect(record + 1); + output2.collect(record - 1); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index e56ffdbf10a..aa466e16b39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.CheckpointingOptions; @@ -119,6 +120,9 @@ public class StreamConfig implements Serializable { private static final String TIME_CHARACTERISTIC = "timechar"; private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; + + private static final String ATTRIBUTE = "attribute"; + private static final ConfigOption<Boolean> STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions.key("statebackend.useManagedMemory") .booleanType() @@ -800,6 +804,20 @@ public class StreamConfig implements Serializable { return config.getBoolean(GRAPH_CONTAINING_LOOPS, false); } + public void setAttribute(Attribute attribute) { + if (attribute != null) { + toBeSerializedConfigObjects.put(ATTRIBUTE, attribute); + } + } + + public Attribute getAttribute(ClassLoader cl) { + try { + return InstantiationUtil.readObjectFromConfig(this.config, ATTRIBUTE, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate checkpoint storage.", e); + } + } + /** * In general, we don't clear any configuration. However, the {@link #SERIALIZED_UDF} may be * very large when operator includes some large objects, the SERIALIZED_UDF is used to create a diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index fc794022bf8..0823ed29e32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; @@ -1085,4 +1086,10 @@ public class StreamGraph implements Pipeline { streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts); } } + + public void setAttribute(Integer vertexId, Attribute attribute) { + if (getStreamNode(vertexId) != null) { + getStreamNode(vertexId).setAttribute(attribute); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index d86cec1059e..f47f1814332 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.ResourceSpec; @@ -98,6 +99,8 @@ public class StreamNode { private boolean parallelismConfigured = false; + private Attribute attribute = new Attribute.Builder().build(); + @VisibleForTesting public StreamNode( Integer id, @@ -189,6 +192,14 @@ public class StreamNode { return id; } + public void setAttribute(Attribute attribute) { + this.attribute = attribute; + } + + public Attribute getAttribute() { + return attribute; + } + public int getParallelism() { return parallelism; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7b0a6cf90b4..358c62d8610 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; @@ -187,9 +188,6 @@ public class StreamingJobGraphGenerator { private final Map<Integer, InputOutputFormatContainer> chainedInputOutputFormats; - // the ids of nodes whose output result partition type should be set to BLOCKING - private final Set<Integer> outputBlockingNodesID; - private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; @@ -230,7 +228,6 @@ public class StreamingJobGraphGenerator { this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.chainedInputOutputFormats = new HashMap<>(); - this.outputBlockingNodesID = new HashSet<>(); this.physicalEdgesInOrder = new ArrayList<>(); this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor); this.chainInfos = new HashMap<>(); @@ -682,10 +679,12 @@ public class StreamingJobGraphGenerator { List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); - - boolean isOutputOnlyAfterEndOfStream = currentNode.isOutputOnlyAfterEndOfStream(); - if (isOutputOnlyAfterEndOfStream) { - outputBlockingNodesID.add(currentNode.getId()); + Attribute currentNodeAttribute = currentNode.getAttribute(); + boolean isNoOutputUntilEndOfInput = + currentNode.isOutputOnlyAfterEndOfStream() + || currentNodeAttribute.isNoOutputUntilEndOfInput(); + if (isNoOutputUntilEndOfInput) { + currentNodeAttribute.setNoOutputUntilEndOfInput(true); } for (StreamEdge outEdge : currentNode.getOutEdges()) { @@ -697,9 +696,12 @@ public class StreamingJobGraphGenerator { } for (StreamEdge chainable : chainableOutputs) { - // Mark downstream nodes in the same chain as outputBlocking - if (isOutputOnlyAfterEndOfStream) { - outputBlockingNodesID.add(chainable.getTargetId()); + StreamNode targetNode = streamGraph.getStreamNode(chainable.getTargetId()); + Attribute targetNodeAttribute = targetNode.getAttribute(); + if (isNoOutputUntilEndOfInput) { + if (targetNodeAttribute != null) { + targetNodeAttribute.setNoOutputUntilEndOfInput(true); + } } transitiveOutEdges.addAll( createChain( @@ -708,8 +710,9 @@ public class StreamingJobGraphGenerator { chainInfo, chainEntryPoints)); // Mark upstream nodes in the same chain as outputBlocking - if (outputBlockingNodesID.contains(chainable.getTargetId())) { - outputBlockingNodesID.add(currentNodeId); + if (targetNodeAttribute != null + && targetNodeAttribute.isNoOutputUntilEndOfInput()) { + currentNodeAttribute.setNoOutputUntilEndOfInput(true); } } @@ -757,7 +760,7 @@ public class StreamingJobGraphGenerator { : new StreamConfig(new Configuration()); tryConvertPartitionerForDynamicGraph(chainableOutputs, nonChainableOutputs); - + config.setAttribute(currentNodeAttribute); setOperatorConfig(currentNodeId, config, chainInfo.getChainedSources()); setOperatorChainedOutputsConfig(config, chainableOutputs); @@ -1504,7 +1507,9 @@ public class StreamingJobGraphGenerator { } private ResultPartitionType determineUndefinedResultPartitionType(StreamEdge edge) { - if (outputBlockingNodesID.contains(edge.getSourceId())) { + Attribute sourceNodeAttribute = + streamGraph.getStreamNode(edge.getSourceId()).getAttribute(); + if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) { edge.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT); return ResultPartitionType.BLOCKING; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java index 54cfda86339..3f5269c97a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java @@ -69,7 +69,7 @@ abstract class AbstractOneInputTransformationTranslator<IN, OUT, OP extends Tran inputType, transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); if (stateKeySelector != null) { TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig.getSerializerConfig()); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java index 134c10ab806..653c1f43024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java @@ -75,7 +75,7 @@ public abstract class AbstractTwoInputTransformationTranslator< secondInputTransformation.getOutputType(), transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); if (firstKeySelector != null || secondKeySelector != null) { checkState( keyTypeInfo != null, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java index be2980f3ab6..c52ade1d638 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java @@ -96,7 +96,7 @@ public class MultiInputTransformationTranslator<OUT> transformation.getInputTypes(), transformation.getOutputType(), transformation.getName()); - + streamGraph.setAttribute(transformationId, transformation.getAttribute()); final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transformation.getParallelism() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java index 6d7ae8103f1..e17941d6551 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java @@ -67,7 +67,8 @@ public final class OneInputTransformationTranslator<IN, OUT> transformation.getStateKeyType(), context); - if (transformation.isOutputOnlyAfterEndOfStream()) { + if (transformation.isOutputOnlyAfterEndOfStream() + || transformation.getAttribute().isNoOutputUntilEndOfInput()) { maybeApplyBatchExecutionSettings(transformation, context); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java index 94c9e2ec173..d710bbe2b13 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java @@ -49,8 +49,9 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link StreamingJobGraphGenerator} with internal sorter. */ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { + @Test - void testOutputOnlyAfterEndOfStream() { + void testOutputOnlyAfterEndOfStreamEnableChain() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); @@ -97,10 +98,46 @@ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue(); assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); + } + + @Test + void testOutputOnlyAfterEndOfStreamDisableChain() { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + + final DataStream<Integer> source = env.fromData(1, 2, 3).name("source"); + source.keyBy(x -> x) + .transform( + "transform", + Types.INT, + new StreamOperatorWithConfigurableOperatorAttributes<>( + x -> x, + new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(true) + .build())) + .map(x -> x) + .sinkTo(new DiscardingSink<>()) + .disableChaining() + .name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(false); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue(); + assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("transform"), 1); + assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0); env.disableOperatorChaining(); - jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false)); - vertexMap = new HashMap<>(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false)); + Map<String, JobVertex> vertexMap = new HashMap<>(); for (JobVertex vertex : jobGraph.getVertices()) { vertexMap.put(vertex.getName(), vertex); } @@ -115,6 +152,56 @@ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); } + @Test + void testOutputOnlyAfterEndOfStreamPropagateToUpstreamWithinChain() { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + + final DataStream<Integer> source = env.fromData(1, 2, 3).name("source"); + source.keyBy(x -> x) + .map(x -> x) + .transform( + "transform", + Types.INT, + new StreamOperatorWithConfigurableOperatorAttributes<>( + x -> x, + new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(true) + .build())) + .sinkTo(new DiscardingSink<>()) + .disableChaining() + .name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(false); + Map<String, StreamNode> nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue(); + assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("transform"), 0); + assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + Map<String, JobVertex> vertexMap = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + vertexMap.put(vertex.getName(), vertex); + } + assertThat(vertexMap).hasSize(3); + assertHasOutputPartitionType( + vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED); + assertHasOutputPartitionType( + vertexMap.get("Map -> transform"), ResultPartitionType.BLOCKING); + assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse(); + assertThat(vertexMap.get("Map -> transform").isAnyOutputBlocking()).isTrue(); + assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse(); + } + @Test void testApplyBatchExecutionSettingsOnTwoInputOperator() { final StreamExecutionEnvironment env = @@ -146,10 +233,6 @@ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0); } - private static void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { - assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); - } - @Test void testOneInputOperatorWithInternalSorterSupported() { final StreamExecutionEnvironment env = @@ -361,6 +444,10 @@ public class StreamingJobGraphGeneratorWithOperatorAttributesTest { IN2 value, CoProcessFunction<IN1, IN2, OUT>.Context ctx, Collector<OUT> out) {} } + private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) { + assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize); + } + private void assertHasOutputPartitionType( JobVertex jobVertex, ResultPartitionType partitionType) { assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
