This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8524e2230f472882eeffd29b813e36ab0e43cebf
Author: ifndef-SleePy <[email protected]>
AuthorDate: Fri Jun 14 19:26:24 2019 +0800

    [FLINK-12686][datastream] Add 
StreamExecutionEnvironment.getStreamGraph(name)
---
 .../streaming/api/environment/LocalStreamEnvironment.java     |  3 +--
 .../streaming/api/environment/RemoteStreamEnvironment.java    |  6 ++----
 .../streaming/api/environment/StreamContextEnvironment.java   |  3 +--
 .../streaming/api/environment/StreamExecutionEnvironment.java | 11 +++++++++++
 .../streaming/api/environment/StreamPlanEnvironment.java      |  3 +--
 .../org/apache/flink/streaming/api/graph/StreamGraph.java     |  3 +--
 .../streaming/api/graph/StreamingJobGraphGeneratorTest.java   |  3 +--
 .../flink/table/client/gateway/local/ExecutionContext.java    |  5 +----
 .../apache/flink/streaming/util/TestStreamEnvironment.java    |  3 +--
 .../test/checkpointing/ResumeCheckpointManuallyITCase.java    |  3 +--
 .../org/apache/flink/test/checkpointing/SavepointITCase.java  |  3 +--
 11 files changed, 22 insertions(+), 24 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index eca802f..5fce32d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -86,8 +86,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
                // transform the streaming program into a JobGraph
-               StreamGraph streamGraph = getStreamGraph();
-               streamGraph.setJobName(jobName);
+               StreamGraph streamGraph = getStreamGraph(jobName);
 
                JobGraph jobGraph = streamGraph.getJobGraph();
                jobGraph.setAllowQueuedScheduling(true);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index f9c32ee..324478e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -223,8 +223,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
                String jobName,
                SavepointRestoreSettings savepointRestoreSettings
        ) throws ProgramInvocationException {
-               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph();
-               streamGraph.setJobName(jobName);
+               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph(jobName);
                return executeRemotely(streamGraph,
                        streamExecutionEnvironment.getClass().getClassLoader(),
                        streamExecutionEnvironment.getConfig(),
@@ -304,8 +303,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
 
        @Override
        public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-               StreamGraph streamGraph = getStreamGraph();
-               streamGraph.setJobName(jobName);
+               StreamGraph streamGraph = getStreamGraph(jobName);
                transformations.clear();
                return executeRemotely(streamGraph, jarFiles);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index c0216e5..f523117 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -50,8 +50,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        public JobExecutionResult execute(String jobName) throws Exception {
                Preconditions.checkNotNull(jobName, "Streaming Job name should 
not be null.");
 
-               StreamGraph streamGraph = this.getStreamGraph();
-               streamGraph.setJobName(jobName);
+               StreamGraph streamGraph = this.getStreamGraph(jobName);
 
                transformations.clear();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6c334a6..d4e0e66 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1514,6 +1514,17 @@ public abstract class StreamExecutionEnvironment {
                return getStreamGraphGenerator().generate();
        }
 
+       /**
+        * Getter of the {@link 
org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+        *
+        * @param jobName Desired name of the job
+        * @return The streamgraph representing the transformations
+        */
+       @Internal
+       public StreamGraph getStreamGraph(String jobName) {
+               return getStreamGraphGenerator().setJobName(jobName).generate();
+       }
+
        private StreamGraphGenerator getStreamGraphGenerator() {
                if (transformations.size() <= 0) {
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot execute.");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 687176b..28a8a0a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -56,8 +56,7 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
 
-               StreamGraph streamGraph = getStreamGraph();
-               streamGraph.setJobName(jobName);
+               StreamGraph streamGraph = getStreamGraph(jobName);
 
                transformations.clear();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c0fedd8..2d72276 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -80,7 +79,7 @@ public class StreamGraph extends StreamingPlan {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
 
-       private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
+       private String jobName;
 
        private final ExecutionConfig executionConfig;
        private final CheckpointConfig checkpointConfig;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 284db19..01b888e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -104,8 +104,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
                // --------- the job graph ---------
 
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.setJobName("test job");
+               StreamGraph streamGraph = env.getStreamGraph("test job");
                JobGraph jobGraph = streamGraph.getJobGraph();
                List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ead3b18..344505f 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.table.api.BatchQueryConfig;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
@@ -376,9 +375,7 @@ public class ExecutionContext<T> {
 
                private FlinkPlan createPlan(String name, Configuration 
flinkConfig) {
                        if (streamExecEnv != null) {
-                               final StreamGraph graph = 
streamExecEnv.getStreamGraph();
-                               graph.setJobName(name);
-                               return graph;
+                               return streamExecEnv.getStreamGraph(name);
                        } else {
                                final int parallelism = 
execEnv.getParallelism();
                                final Plan unoptimizedPlan = 
execEnv.createProgramPlan();
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 7a44314..79365c8 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -66,8 +66,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
 
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
-               final StreamGraph streamGraph = getStreamGraph();
-               streamGraph.setJobName(jobName);
+               final StreamGraph streamGraph = getStreamGraph(jobName);
                final JobGraph jobGraph = streamGraph.getJobGraph();
 
                for (Path jarFile : jarFiles) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 4a08b7c..2d5fa9f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -362,8 +362,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                        .reduce((value1, value2) -> Tuple2.of(value1.f0, 
value1.f1 + value2.f1))
                        .filter(value -> value.f0.startsWith("Tuple 0"));
 
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.setJobName("Test");
+               StreamGraph streamGraph = env.getStreamGraph("Test");
 
                JobGraph jobGraph = streamGraph.getJobGraph();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 5b77f03..41cce68 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -659,8 +659,7 @@ public class SavepointITCase extends TestLogger {
 
                iteration.closeWith(iterationBody);
 
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.setJobName("Test");
+               StreamGraph streamGraph = env.getStreamGraph("Test");
 
                JobGraph jobGraph = streamGraph.getJobGraph();
 

Reply via email to