This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7ae1a5bb9b23ad3cee623e535e39ac30c38d354d Author: godfreyhe <[email protected]> AuthorDate: Tue Nov 17 23:32:07 2020 +0800 [FLINK-18545][table] Specify job name by `pipeline.name` for sql job --- .../apache/flink/table/api/internal/TableEnvironmentImpl.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index d4f43db..10a9a8c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; @@ -699,7 +700,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { public TableResult executeInternal(List<ModifyOperation> operations) { List<Transformation<?>> transformations = translate(operations); List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); - String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); + String jobName = getJobName("insert-into_" + String.join(",", sinkIdentifierNames)); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); @@ -731,7 +732,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { operation ); List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation)); - Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, "collect"); + String jobName = getJobName("collect"); + Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); tableSink.setJobClient(jobClient); @@ -1173,6 +1175,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { ).collect(Collectors.toList()); } + private String getJobName(String defaultJobName) { + return tableConfig.getConfiguration().getString(PipelineOptions.NAME, defaultJobName); + } + /** Get catalog from catalogName or throw a ValidationException if the catalog not exists. */ private Catalog getCatalogOrThrowException(String catalogName) { return getCatalog(catalogName)
