This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f79a09b  [FLINK-20444][runtime] Chain YieldingOperatorFactory to new 
sources.
f79a09b is described below

commit f79a09b8488190b549f4b7159f3e95818d57946a
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Dec 1 14:04:07 2020 +0100

    [FLINK-20444][runtime] Chain YieldingOperatorFactory to new sources.
    
    For legacy sources, we had to disable chaining because of incompatible 
threading models.
    New sources are working fine however and it would give some users massive 
performance improvements.
---
 .../api/graph/StreamingJobGraphGenerator.java      |  2 +-
 .../api/operators/SimpleOperatorFactory.java       |  5 +++++
 .../api/operators/StreamOperatorFactory.java       |  4 ++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 22 +++++++++++++++++++++-
 4 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 33886eb..5430680 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -800,7 +800,7 @@ public class StreamingJobGraphGenerator {
                // yielding operators cannot be chained to legacy sources
                // unfortunately the information that vertices have been 
chained is not preserved at this point
                if (downStreamOperator instanceof YieldingOperatorFactory &&
-                               getHeadOperator(upStreamVertex, 
streamGraph).isStreamSource()) {
+                               getHeadOperator(upStreamVertex, 
streamGraph).isLegacySource()) {
                        return false;
                }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
index 1e3de6e..fddee59 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
@@ -96,6 +96,11 @@ public class SimpleOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
        }
 
        @Override
+       public boolean isLegacySource() {
+               return operator instanceof StreamSource;
+       }
+
+       @Override
        public boolean isOutputTypeConfigurable() {
                return operator instanceof OutputTypeConfigurable;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
index d1ea80c..722d135 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
@@ -55,6 +55,10 @@ public interface StreamOperatorFactory<OUT> extends 
Serializable {
                return false;
        }
 
+       default boolean isLegacySource() {
+               return false;
+       }
+
        /**
         * If the stream operator need access to the output type information at 
{@link StreamGraph}
         * generation. This can be useful for cases where the output type is 
specified by the returns
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 5239c84..584a891 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -789,7 +789,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
         * sources, see FLINK-16219.
         */
        @Test
-       public void testYieldingOperatorProperlyChained() {
+       public void testYieldingOperatorProperlyChainedOnLegacySources() {
                StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 
                chainEnv.fromElements(1)
@@ -809,6 +809,26 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(5, vertices.get(1).getOperatorIDs().size());
        }
 
+       /**
+        * Tests that {@link 
org.apache.flink.streaming.api.operators.YieldingOperatorFactory} are chained 
to new sources,
+        * see FLINK-20444.
+        */
+       @Test
+       public void testYieldingOperatorProperlyChainedOnNewSources() {
+               StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+               chainEnv.fromSource(new NumberSequenceSource(0, 10), 
WatermarkStrategy.noWatermarks(), "input")
+                       .map((x) -> x)
+                       .transform("test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
+                       .addSink(new DiscardingSink<>());
+
+               final JobGraph jobGraph = 
chainEnv.getStreamGraph().getJobGraph();
+
+               final List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               Assert.assertEquals(1, vertices.size());
+               assertEquals(4, vertices.get(0).getOperatorIDs().size());
+       }
+
        @Test(expected = UnsupportedOperationException.class)
        public void testNotSupportInputSelectableOperatorIfCheckpointing() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Reply via email to