This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c8e9d42cea40fd01be5cbdfdb791a4ecb5cdd07
Author: godfreyhe <[email protected]>
AuthorDate: Tue Nov 17 22:50:50 2020 +0800

    [FLINK-18545][configuration] Introduce `pipeline.name` to allow users to 
specify job name by configuration
    
    (cherry picked from commit 78d6ef6f41b24977c531cfcb430cf184edc6f5ed)
---
 .../generated/pipeline_configuration.html          |  6 ++++
 .../flink/configuration/PipelineOptions.java       | 10 +++++++
 .../environment/StreamExecutionEnvironment.java    | 15 +++++++---
 .../api/StreamExecutionEnvironmentTest.java        | 33 ++++++++++++++++++++++
 4 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/docs/_includes/generated/pipeline_configuration.html 
b/docs/_includes/generated/pipeline_configuration.html
index f3fc216..a7f970c 100644
--- a/docs/_includes/generated/pipeline_configuration.html
+++ b/docs/_includes/generated/pipeline_configuration.html
@@ -87,6 +87,12 @@
             <td>The program-wide maximum parallelism used for operators which 
haven't specified a maximum parallelism. The maximum parallelism specifies the 
upper limit for dynamic scaling and the number of key groups used for 
partitioned state.</td>
         </tr>
         <tr>
+            <td><h5>pipeline.name</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The job name used for printing and logging.</td>
+        </tr>
+        <tr>
             <td><h5>pipeline.object-reuse</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 3c405b3..a687cba 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -36,6 +36,16 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
  */
 @PublicEvolving
 public class PipelineOptions {
+
+       /**
+        * The job name used for printing and logging.
+        */
+       public static final ConfigOption<String> NAME =
+               key("pipeline.name")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("The job name used for printing and 
logging.");
+
        /**
         * A list of jar files that contain the user-defined function (UDF) 
classes and all classes used from within the UDFs.
         */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9effe65..f458c27 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -839,6 +839,9 @@ public class StreamExecutionEnvironment {
                
configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(
                        sortInputs -> 
this.getConfiguration().set(ExecutionOptions.USE_BATCH_STATE_BACKEND, 
sortInputs)
                );
+               configuration.getOptional(PipelineOptions.NAME).ifPresent(
+                       jobName -> 
this.getConfiguration().set(PipelineOptions.NAME, jobName)
+               );
                config.configure(configuration, classLoader);
                checkpointCfg.configure(configuration);
        }
@@ -1798,7 +1801,7 @@ public class StreamExecutionEnvironment {
         * @throws Exception which occurs during job execution.
         */
        public JobExecutionResult execute() throws Exception {
-               return execute(DEFAULT_JOB_NAME);
+               return execute(getJobName());
        }
 
        /**
@@ -1891,7 +1894,7 @@ public class StreamExecutionEnvironment {
         */
        @PublicEvolving
        public final JobClient executeAsync() throws Exception {
-               return executeAsync(DEFAULT_JOB_NAME);
+               return executeAsync(getJobName());
        }
 
        /**
@@ -1958,7 +1961,7 @@ public class StreamExecutionEnvironment {
         */
        @Internal
        public StreamGraph getStreamGraph() {
-               return getStreamGraph(DEFAULT_JOB_NAME);
+               return getStreamGraph(getJobName());
        }
 
        /**
@@ -2018,7 +2021,7 @@ public class StreamExecutionEnvironment {
         * @return The execution plan of the program, as a JSON String.
         */
        public String getExecutionPlan() {
-               return getStreamGraph(DEFAULT_JOB_NAME, 
false).getStreamingPlanAsJSON();
+               return getStreamGraph(getJobName(), 
false).getStreamingPlanAsJSON();
        }
 
        /**
@@ -2350,4 +2353,8 @@ public class StreamExecutionEnvironment {
                }
                return (T) resolvedTypeInfo;
        }
+
+       private String getJobName() {
+               return configuration.getString(PipelineOptions.NAME, 
DEFAULT_JOB_NAME);
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index d0bc4b0..e8d5add 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -270,6 +272,37 @@ public class StreamExecutionEnvironmentTest {
        }
 
        @Test
+       public void testDefaultJobName() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               testJobName(StreamExecutionEnvironment.DEFAULT_JOB_NAME, env);
+       }
+
+       @Test
+       public void testUserDefinedJobName() {
+               String jobName = "MyTestJob";
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.NAME, jobName);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+               testJobName(jobName, env);
+       }
+
+       @Test
+       public void testUserDefinedJobNameWithConfigure() {
+               String jobName = "MyTestJob";
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.NAME, jobName);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.configure(config, this.getClass().getClassLoader());
+               testJobName(jobName, env);
+       }
+
+       private void testJobName(String expectedJobName, 
StreamExecutionEnvironment env) {
+               env.fromElements(1, 2, 3).print();
+               StreamGraph streamGraph = env.getStreamGraph();
+               assertEquals(expectedJobName, streamGraph.getJobName());
+       }
+
+       @Test
        public void testAddSourceWithUserDefinedTypeInfo() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                DataStreamSource<Row> source1 = env.addSource(new 
RowSourceFunction(), Types.ROW(Types.STRING));

Reply via email to