This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82b583ea0dcb61c10594de61c36b02ec7d70b498 Author: Arvid Heise <[email protected]> AuthorDate: Tue Jan 4 19:34:48 2022 +0100 [hotfix][streaming] Improve StreamGraphGenerator's performance by using a IdentityHashMap to track transformations. The already transformed transformation are copied into a different map and compared. If the transformation does not properly implement equals the isTransformed check may fail and the transformation is copied multiple times. Now that is hardened because we check the object reference. --- .../streaming/api/graph/StreamGraphGenerator.java | 3 +- .../api/graph/StreamGraphGeneratorTest.java | 50 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index e527170..47c9def 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -93,6 +93,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -309,7 +310,7 @@ public class StreamGraphGenerator { shouldExecuteInBatchMode = shouldExecuteInBatchMode(); configureStreamGraph(streamGraph); - alreadyTransformed = new HashMap<>(); + alreadyTransformed = new IdentityHashMap<>(); for (Transformation<?> transformation : transformations) { transform(transformation); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 4a59ec4..3815af2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -76,12 +76,14 @@ import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -806,6 +808,54 @@ public class StreamGraphGeneratorTest extends TestLogger { env.getStreamGraph(); } + @Test + public void testTrackTransformationsByIdentity() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final Transformation<?> noopTransformation = env.fromSequence(1, 2).getTransformation(); + + final StreamGraphGenerator generator = + new StreamGraphGenerator( + Arrays.asList( + noopTransformation, + new FailingTransformation(noopTransformation.hashCode())), + new ExecutionConfig(), + new CheckpointConfig()); + assertThatThrownBy(generator::generate) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Unknown transformation: FailingTransformation"); + } + + private static class FailingTransformation extends Transformation<String> { + private final int hashCode; + + FailingTransformation(int hashCode) { + super("FailingTransformation", BasicTypeInfo.STRING_TYPE_INFO, 1); + this.hashCode = hashCode; + } + + @Override + public List<Transformation<?>> getTransitivePredecessors() { + return Collections.emptyList(); + } + + @Override + public List<Transformation<?>> getInputs() { + return Collections.emptyList(); + } + + // Overwrite equal to test transformation based on identity + @Override + public boolean equals(Object o) { + return true; + } + + // Overwrite hashCode to test transformation based on identity + @Override + public int hashCode() { + return hashCode; + } + } + private static class OutputTypeConfigurableFunction<T> implements OutputTypeConfigurable<T>, Function { private TypeInformation<T> typeInformation;
