This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 197edbfba1da52c5c51a5b4df1947db793729110
Author: ifndef-SleePy <[email protected]>
AuthorDate: Fri Jun 14 22:12:58 2019 +0800

    [FLINK-12832][datastream] Make ScheduleMode configurable in 
StreamGraphGenerator
---
 .../flink/streaming/api/graph/StreamGraph.java     | 11 ++++++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 11 ++++++++
 .../api/graph/StreamingJobGraphGenerator.java      |  3 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 32 ++++++++++++++++++++++
 4 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7381cde..87e6d86 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -88,6 +89,8 @@ public class StreamGraph extends StreamingPlan {
        private final ExecutionConfig executionConfig;
        private final CheckpointConfig checkpointConfig;
 
+       private ScheduleMode scheduleMode;
+
        private boolean chaining;
 
        private Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts;
@@ -157,6 +160,14 @@ public class StreamGraph extends StreamingPlan {
                return this.stateBackend;
        }
 
+       public ScheduleMode getScheduleMode() {
+               return scheduleMode;
+       }
+
+       public void setScheduleMode(ScheduleMode scheduleMode) {
+               this.scheduleMode = scheduleMode;
+       }
+
        public Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> getUserArtifacts() {
                return userArtifacts;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index b5ab9c5..ead8e07 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -89,6 +90,8 @@ public class StreamGraphGenerator {
 
        public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
 
+       public static final ScheduleMode DEFAULT_SCHEDULE_MODE = 
ScheduleMode.EAGER;
+
        public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = 
TimeCharacteristic.ProcessingTime;
 
        public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
@@ -110,6 +113,8 @@ public class StreamGraphGenerator {
 
        private boolean isSlotSharingEnabled = true;
 
+       private ScheduleMode scheduleMode = DEFAULT_SCHEDULE_MODE;
+
        private Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts;
 
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
@@ -152,6 +157,11 @@ public class StreamGraphGenerator {
                return this;
        }
 
+       public StreamGraphGenerator setScheduleMode(ScheduleMode scheduleMode) {
+               this.scheduleMode = scheduleMode;
+               return this;
+       }
+
        public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts) {
                this.userArtifacts = userArtifacts;
                return this;
@@ -176,6 +186,7 @@ public class StreamGraphGenerator {
                streamGraph = new StreamGraph(executionConfig, 
checkpointConfig);
                streamGraph.setStateBackend(stateBackend);
                streamGraph.setChaining(chaining);
+               streamGraph.setScheduleMode(scheduleMode);
                streamGraph.setUserArtifacts(userArtifacts);
                streamGraph.setTimeCharacteristic(timeCharacteristic);
                streamGraph.setJobName(jobName);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index dbedcdc..edf0314 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -142,7 +141,7 @@ public class StreamingJobGraphGenerator {
        private JobGraph createJobGraph() {
 
                // make sure that all vertices start immediately
-               jobGraph.setScheduleMode(ScheduleMode.EAGER);
+               jobGraph.setScheduleMode(streamGraph.getScheduleMode());
 
                // Generate deterministic hashes for the nodes in order to 
identify them across
                // submission iff they didn't change.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index f8bed23..6c5c050 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -58,6 +59,7 @@ import org.junit.Test;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -502,4 +504,34 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertNotNull(iterationSinkCoLocationGroup);
                assertEquals(iterationSourceCoLocationGroup, 
iterationSinkCoLocationGroup);
        }
+
+       /**
+        * Test default schedule mode.
+        */
+       @Test
+       public void testDefaultScheduleMode() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // use eager schedule mode by default
+               StreamGraph streamGraph = new 
StreamGraphGenerator(Collections.emptyList(),
+                       env.getConfig(), env.getCheckpointConfig())
+                       .generate();
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+               assertEquals(ScheduleMode.EAGER, jobGraph.getScheduleMode());
+       }
+
+       /**
+        * Test schedule mode is configurable or not.
+        */
+       @Test
+       public void testSetScheduleMode() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               StreamGraph streamGraph = new 
StreamGraphGenerator(Collections.emptyList(),
+                       env.getConfig(), env.getCheckpointConfig())
+                       .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
+                       .generate();
+               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+               assertEquals(ScheduleMode.LAZY_FROM_SOURCES, 
jobGraph.getScheduleMode());
+       }
 }

Reply via email to