This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8524e2230f472882eeffd29b813e36ab0e43cebf Author: ifndef-SleePy <[email protected]> AuthorDate: Fri Jun 14 19:26:24 2019 +0800 [FLINK-12686][datastream] Add StreamExecutionEnvironment.getStreamGraph(name) --- .../streaming/api/environment/LocalStreamEnvironment.java | 3 +-- .../streaming/api/environment/RemoteStreamEnvironment.java | 6 ++---- .../streaming/api/environment/StreamContextEnvironment.java | 3 +-- .../streaming/api/environment/StreamExecutionEnvironment.java | 11 +++++++++++ .../streaming/api/environment/StreamPlanEnvironment.java | 3 +-- .../org/apache/flink/streaming/api/graph/StreamGraph.java | 3 +-- .../streaming/api/graph/StreamingJobGraphGeneratorTest.java | 3 +-- .../flink/table/client/gateway/local/ExecutionContext.java | 5 +---- .../apache/flink/streaming/util/TestStreamEnvironment.java | 3 +-- .../test/checkpointing/ResumeCheckpointManuallyITCase.java | 3 +-- .../org/apache/flink/test/checkpointing/SavepointITCase.java | 3 +-- 11 files changed, 22 insertions(+), 24 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index eca802f..5fce32d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -86,8 +86,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); + StreamGraph streamGraph = getStreamGraph(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index f9c32ee..324478e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -223,8 +223,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { String jobName, SavepointRestoreSettings savepointRestoreSettings ) throws ProgramInvocationException { - StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(); - streamGraph.setJobName(jobName); + StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph(jobName); return executeRemotely(streamGraph, streamExecutionEnvironment.getClass().getClassLoader(), streamExecutionEnvironment.getConfig(), @@ -304,8 +303,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); + StreamGraph streamGraph = getStreamGraph(jobName); transformations.clear(); return executeRemotely(streamGraph, jarFiles); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index c0216e5..f523117 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -50,8 +50,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull(jobName, "Streaming Job name should not be null."); - StreamGraph streamGraph = this.getStreamGraph(); - streamGraph.setJobName(jobName); + StreamGraph streamGraph = this.getStreamGraph(jobName); transformations.clear(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 6c334a6..d4e0e66 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1514,6 +1514,17 @@ public abstract class StreamExecutionEnvironment { return getStreamGraphGenerator().generate(); } + /** + * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. + * + * @param jobName Desired name of the job + * @return The streamgraph representing the transformations + */ + @Internal + public StreamGraph getStreamGraph(String jobName) { + return getStreamGraphGenerator().setJobName(jobName).generate(); + } + private StreamGraphGenerator getStreamGraphGenerator() { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 687176b..28a8a0a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -56,8 +56,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); + StreamGraph streamGraph = getStreamGraph(jobName); transformations.clear(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index c0fedd8..2d72276 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.transformations.ShuffleMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -80,7 +79,7 @@ public class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); - private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; + private String jobName; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 284db19..01b888e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -104,8 +104,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { // --------- the job graph --------- - StreamGraph streamGraph = env.getStreamGraph(); - streamGraph.setJobName("test job"); + StreamGraph streamGraph = env.getStreamGraph("test job"); JobGraph jobGraph = streamGraph.getJobGraph(); List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index ead3b18..344505f 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.table.api.BatchQueryConfig; import org.apache.flink.table.api.QueryConfig; import org.apache.flink.table.api.StreamQueryConfig; @@ -376,9 +375,7 @@ public class ExecutionContext<T> { private FlinkPlan createPlan(String name, Configuration flinkConfig) { if (streamExecEnv != null) { - final StreamGraph graph = streamExecEnv.getStreamGraph(); - graph.setJobName(name); - return graph; + return streamExecEnv.getStreamGraph(name); } else { final int parallelism = execEnv.getParallelism(); final Plan unoptimizedPlan = execEnv.createProgramPlan(); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 7a44314..79365c8 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -66,8 +66,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { - final StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); + final StreamGraph streamGraph = getStreamGraph(jobName); final JobGraph jobGraph = streamGraph.getJobGraph(); for (Path jarFile : jarFiles) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 4a08b7c..2d5fa9f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -362,8 +362,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)) .filter(value -> value.f0.startsWith("Tuple 0")); - StreamGraph streamGraph = env.getStreamGraph(); - streamGraph.setJobName("Test"); + StreamGraph streamGraph = env.getStreamGraph("Test"); JobGraph jobGraph = streamGraph.getJobGraph(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 5b77f03..41cce68 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -659,8 +659,7 @@ public class SavepointITCase extends TestLogger { iteration.closeWith(iterationBody); - StreamGraph streamGraph = env.getStreamGraph(); - streamGraph.setJobName("Test"); + StreamGraph streamGraph = env.getStreamGraph("Test"); JobGraph jobGraph = streamGraph.getJobGraph();
