This is an automated email from the ASF dual-hosted git repository.
guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dcb30f934a7 [FLINK-25015][Table SQL] Use SQL string as jobName for DQL
jobs submitted by sql-gateway
dcb30f934a7 is described below
commit dcb30f934a7953e33d35460414c6ee57cb982ab3
Author: Xiangyu Feng <[email protected]>
AuthorDate: Sun Feb 18 14:56:26 2024 +0800
[FLINK-25015][Table SQL] Use SQL string as jobName for DQL jobs submitted
by sql-gateway
---
.../table/api/internal/TableEnvironmentImpl.java | 24 +++++++++----
.../api/QueryOperationSqlSerializationTest.java | 39 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6548c03c4b9..c4a58d9d351 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1065,14 +1065,9 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
QueryOperation operation,
CollectModifyOperation sinkOperation,
List<Transformation<?>> transformations) {
- final String defaultJobName = "collect";
-
resourceManager.addJarConfiguration(tableConfig);
- // We pass only the configuration to avoid reconfiguration with the
rootConfiguration
- Pipeline pipeline =
- execEnv.createPipeline(
- transformations, tableConfig.getConfiguration(),
defaultJobName);
+ Pipeline pipeline = generatePipelineFromQueryOperation(operation,
transformations);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
ResultProvider resultProvider =
sinkOperation.getSelectResultProvider();
@@ -1185,6 +1180,23 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
}
}
+ /** generate execution {@link Pipeline} from {@link QueryOperation}. */
+ @VisibleForTesting
+ public Pipeline generatePipelineFromQueryOperation(
+ QueryOperation operation, List<Transformation<?>> transformations)
{
+ String defaultJobName = "collect";
+
+ try {
+ defaultJobName = operation.asSerializableString();
+ } catch (Throwable e) {
+ // ignore error for unsupported operations and use 'collect' as
default job name
+ }
+
+ // We pass only the configuration to avoid reconfiguration with the
rootConfiguration
+ return execEnv.createPipeline(
+ transformations, tableConfig.getConfiguration(),
defaultJobName);
+ }
+
/**
* extract sink identifier names from {@link ModifyOperation}s and
deduplicate them with {@link
* #deduplicateSinkIdentifierNames(List)}.
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index acd38d6adc8..1c9251608a3 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.api;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.CollectModifyOperation;
+import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.test.program.SqlTestStep;
import org.apache.flink.table.test.program.TableApiTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
@@ -29,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -78,6 +84,39 @@ public class QueryOperationSqlSerializationTest implements
TableTestProgramRunne
assertThat(table.getQueryOperation().asSerializableString()).isEqualTo(sqlStep.sql);
}
+ @ParameterizedTest
+ @MethodSource("supportedPrograms")
+ void testSqlAsJobNameForQueryOperation(TableTestProgram program) {
+ final TableEnvironmentImpl env = (TableEnvironmentImpl)
setupEnv(program);
+
+ final TableApiTestStep tableApiStep =
+ (TableApiTestStep)
+ program.runSteps.stream()
+ .filter(s -> s instanceof TableApiTestStep)
+ .findFirst()
+ .get();
+
+ final SqlTestStep sqlStep =
+ (SqlTestStep)
+ program.runSteps.stream()
+ .filter(s -> s instanceof SqlTestStep)
+ .findFirst()
+ .get();
+
+ final Table table = tableApiStep.toTable(env);
+
+ QueryOperation queryOperation = table.getQueryOperation();
+ CollectModifyOperation sinkOperation = new
CollectModifyOperation(queryOperation);
+ List<Transformation<?>> transformations =
+
env.getPlanner().translate(Collections.singletonList(sinkOperation));
+
+ StreamGraph streamGraph =
+ (StreamGraph)
+ env.generatePipelineFromQueryOperation(queryOperation,
transformations);
+
+ assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName());
+ }
+
private static TableEnvironment setupEnv(TableTestProgram program) {
final TableEnvironment env =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
final Map<String, String> connectorOptions = new HashMap<>();