This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1aa1b8c6e6d4628b01ff5503f3a680152680caeb
Author: godfreyhe <[email protected]>
AuthorDate: Tue Nov 17 23:32:07 2020 +0800

    [FLINK-18545][table] Specify job name by `pipeline.name` for sql job
    
    (cherry picked from commit 88d0ee002c38f8a17895d4485703787a04f67c50)
---
 .../apache/flink/table/api/internal/TableEnvironmentImpl.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 8ab6974..a6a3c51 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -673,7 +674,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        public TableResult executeInternal(List<ModifyOperation> operations) {
                List<Transformation<?>> transformations = translate(operations);
                List<String> sinkIdentifierNames = 
extractSinkIdentifierNames(operations);
-               String jobName = "insert-into_" + String.join(",", 
sinkIdentifierNames);
+               String jobName = getJobName("insert-into_" + String.join(",", 
sinkIdentifierNames));
                Pipeline pipeline = execEnv.createPipeline(transformations, 
tableConfig, jobName);
                try {
                        JobClient jobClient = execEnv.executeAsync(pipeline);
@@ -700,7 +701,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        public TableResult executeInternal(QueryOperation operation) {
                SelectSinkOperation sinkOperation = new 
SelectSinkOperation(operation);
                List<Transformation<?>> transformations = 
translate(Collections.singletonList(sinkOperation));
-               Pipeline pipeline = execEnv.createPipeline(transformations, 
tableConfig, "collect");
+               String jobName = getJobName("collect");
+               Pipeline pipeline = execEnv.createPipeline(transformations, 
tableConfig, jobName);
                try {
                        JobClient jobClient = execEnv.executeAsync(pipeline);
                        SelectResultProvider resultProvider = 
sinkOperation.getSelectResultProvider();
@@ -1170,6 +1172,10 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                ).collect(Collectors.toList());
        }
 
+       private String getJobName(String defaultJobName) {
+               return 
tableConfig.getConfiguration().getString(PipelineOptions.NAME, defaultJobName);
+       }
+
        /** Get catalog from catalogName or throw a ValidationException if the 
catalog not exists. */
        private Catalog getCatalogOrThrowException(String catalogName) {
                return getCatalog(catalogName)

Reply via email to