This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a9e1738a2e131426ea8fa3051a2f4cba841730eb Author: ifndef-SleePy <[email protected]> AuthorDate: Mon Jan 23 01:11:38 2023 +0800 [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of Transformation --- .../api/transformations/LegacySinkTransformation.java | 5 +++++ .../streaming/api/transformations/PhysicalTransformation.java | 11 +++++++++++ .../translators/AbstractOneInputTransformationTranslator.java | 8 ++++++++ .../translators/AbstractTwoInputTransformationTranslator.java | 8 ++++++++ .../translators/LegacySinkTransformationTranslator.java | 3 +++ .../translators/LegacySourceTransformationTranslator.java | 3 +++ .../translators/MultiInputTransformationTranslator.java | 3 +++ .../runtime/translators/SourceTransformationTranslator.java | 4 ++++ 8 files changed, 45 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java index 8977c7075df..99e0124ece1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java @@ -128,4 +128,9 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<T> { public final void setChainingStrategy(ChainingStrategy strategy) { operatorFactory.setChainingStrategy(strategy); } + + @Override + public boolean isSupportsConcurrentExecutionAttempts() { + return false; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java index 3c2749f4ef4..31b689a77fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java @@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; @Internal public abstract class PhysicalTransformation<T> extends Transformation<T> { + private boolean supportsConcurrentExecutionAttempts = true; + /** * Creates a new {@code Transformation} with the given name, output type and parallelism. * @@ -47,4 +49,13 @@ public abstract class PhysicalTransformation<T> extends Transformation<T> { /** Sets the chaining strategy of this {@code Transformation}. */ public abstract void setChainingStrategy(ChainingStrategy strategy); + + public boolean isSupportsConcurrentExecutionAttempts() { + return supportsConcurrentExecutionAttempts; + } + + public void setSupportsConcurrentExecutionAttempts( + boolean supportsConcurrentExecutionAttempts) { + this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java index 03d7e1ab8ce..6ac7ce2c37a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import javax.annotation.Nullable; @@ -91,6 +92,13 @@ abstract class AbstractOneInputTransformationTranslator<IN, OUT, OP extends Tran streamGraph.addEdge(inputId, transformationId, 0); } + if (transformation instanceof PhysicalTransformation) { + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, + ((PhysicalTransformation<OUT>) transformation) + .isSupportsConcurrentExecutionAttempts()); + } + return Collections.singleton(transformationId); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java index 03684b89529..c62a3cbed8e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import javax.annotation.Nullable; @@ -100,6 +101,13 @@ public abstract class AbstractTwoInputTransformationTranslator< streamGraph.addEdge(inputId, transformationId, 2); } + if (transformation instanceof PhysicalTransformation) { + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, + ((PhysicalTransformation<OUT>) transformation) + .isSupportsConcurrentExecutionAttempts()); + } + return Collections.singleton(transformationId); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java index 378a7171566..9a571f469dd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java @@ -95,6 +95,9 @@ public class LegacySinkTransformationTranslator<IN> streamGraph.setParallelism(transformationId, parallelism); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, transformation.isSupportsConcurrentExecutionAttempts()); + for (Integer inputId : context.getStreamNodeIds(input)) { streamGraph.addEdge(inputId, transformationId, 0); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java index 8ab5a2a8ee3..c92ba158082 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java @@ -86,6 +86,9 @@ public class LegacySourceTransformationTranslator<OUT> streamGraph.setParallelism(transformationId, parallelism); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, transformation.isSupportsConcurrentExecutionAttempts()); + return Collections.singleton(transformationId); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java index d5af8bd8562..e381b47d822 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java @@ -133,6 +133,9 @@ public class MultiInputTransformationTranslator<OUT> } } + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, transformation.isSupportsConcurrentExecutionAttempts()); + return Collections.singleton(transformationId); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java index af251755a77..57ce5baedb0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java @@ -95,6 +95,10 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu streamGraph.setParallelism(transformationId, parallelism); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); + + streamGraph.setSupportsConcurrentExecutionAttempts( + transformationId, transformation.isSupportsConcurrentExecutionAttempts()); + return Collections.singleton(transformationId); } }
