This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 197edbfba1da52c5c51a5b4df1947db793729110 Author: ifndef-SleePy <[email protected]> AuthorDate: Fri Jun 14 22:12:58 2019 +0800 [FLINK-12832][datastream] Make ScheduleMode configurable in StreamGraphGenerator --- .../flink/streaming/api/graph/StreamGraph.java | 11 ++++++++ .../streaming/api/graph/StreamGraphGenerator.java | 11 ++++++++ .../api/graph/StreamingJobGraphGenerator.java | 3 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 32 ++++++++++++++++++++++ 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 7381cde..87e6d86 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -88,6 +89,8 @@ public class StreamGraph extends StreamingPlan { private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; + private ScheduleMode scheduleMode; + private boolean chaining; private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts; @@ -157,6 +160,14 @@ public class StreamGraph extends StreamingPlan { return this.stateBackend; } + public ScheduleMode getScheduleMode() { + return scheduleMode; + } + + public void setScheduleMode(ScheduleMode scheduleMode) { + this.scheduleMode = scheduleMode; + } + public Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> getUserArtifacts() { return userArtifacts; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index b5ab9c5..ead8e07 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -89,6 +90,8 @@ public class StreamGraphGenerator { public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; + public static final ScheduleMode DEFAULT_SCHEDULE_MODE = ScheduleMode.EAGER; + public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; public static final String DEFAULT_JOB_NAME = "Flink Streaming Job"; @@ -110,6 +113,8 @@ public class StreamGraphGenerator { private boolean isSlotSharingEnabled = true; + private ScheduleMode scheduleMode = DEFAULT_SCHEDULE_MODE; + private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts; private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; @@ -152,6 +157,11 @@ public class StreamGraphGenerator { return this; } + public StreamGraphGenerator setScheduleMode(ScheduleMode scheduleMode) { + this.scheduleMode = scheduleMode; + return this; + } + public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts) { this.userArtifacts = userArtifacts; return this; @@ -176,6 +186,7 @@ public class StreamGraphGenerator { streamGraph = new StreamGraph(executionConfig, checkpointConfig); streamGraph.setStateBackend(stateBackend); streamGraph.setChaining(chaining); + streamGraph.setScheduleMode(scheduleMode); streamGraph.setUserArtifacts(userArtifacts); streamGraph.setTimeCharacteristic(timeCharacteristic); streamGraph.setJobName(jobName); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index dbedcdc..edf0314 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; @@ -142,7 +141,7 @@ public class StreamingJobGraphGenerator { private JobGraph createJobGraph() { // make sure that all vertices start immediately - jobGraph.setScheduleMode(ScheduleMode.EAGER); + jobGraph.setScheduleMode(streamGraph.getScheduleMode()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index f8bed23..6c5c050 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -58,6 +59,7 @@ import org.junit.Test; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -502,4 +504,34 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertNotNull(iterationSinkCoLocationGroup); assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup); } + + /** + * Test default schedule mode. + */ + @Test + public void testDefaultScheduleMode() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // use eager schedule mode by default + StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), + env.getConfig(), env.getCheckpointConfig()) + .generate(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + assertEquals(ScheduleMode.EAGER, jobGraph.getScheduleMode()); + } + + /** + * Test schedule mode is configurable or not. + */ + @Test + public void testSetScheduleMode() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), + env.getConfig(), env.getCheckpointConfig()) + .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES) + .generate(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + assertEquals(ScheduleMode.LAZY_FROM_SOURCES, jobGraph.getScheduleMode()); + } }
