This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new be53133546d [FLINK-32219][sql-client] Fix SqlClient hangs when
executing EXECUTE PLAN statement (#22725)
be53133546d is described below
commit be53133546d054e282c0a24bfe722d0d276f9a8f
Author: Jane Chan <[email protected]>
AuthorDate: Thu Jun 8 10:11:17 2023 +0800
[FLINK-32219][sql-client] Fix SqlClient hangs when executing EXECUTE PLAN
statement (#22725)
(cherry picked from commit 83ba6b5348cbffb26e8d1d5ce6e8d6bb1994e3bc)
---
.../service/operation/OperationExecutor.java | 16 +++++
.../gateway/AbstractSqlGatewayStatementITCase.java | 8 +++
.../src/test/resources/sql/insert.q | 83 ++++++++++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 36 +++++-----
4 files changed, 126 insertions(+), 17 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 22c4ffcdf32..1be89881aec 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -67,6 +67,7 @@ import
org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
@@ -77,6 +78,7 @@ import
org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
@@ -423,6 +425,9 @@ public class OperationExecutor {
} else if (op instanceof ModifyOperation) {
return callModifyOperations(
tableEnv, handle,
Collections.singletonList((ModifyOperation) op));
+ } else if (op instanceof CompileAndExecutePlanOperation
+ || op instanceof ExecutePlanOperation) {
+ return callExecuteOperation(tableEnv, handle, op);
} else if (op instanceof StatementSetOperation) {
return callModifyOperations(
tableEnv, handle, ((StatementSetOperation)
op).getOperations());
@@ -512,6 +517,17 @@ public class OperationExecutor {
return ResultFetcher.fromTableResult(handle, result, false);
}
+ return fetchJobId(result, handle);
+ }
+
+ private ResultFetcher callExecuteOperation(
+ TableEnvironmentInternal tableEnv,
+ OperationHandle handle,
+ Operation executePlanOperation) {
+ return fetchJobId(tableEnv.executeInternal(executePlanOperation),
handle);
+ }
+
+ private ResultFetcher fetchJobId(TableResultInternal result,
OperationHandle handle) {
JobID jobID =
result.getJobClient()
.orElseThrow(
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 0ba90189172..94813359752 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -116,6 +116,9 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
replaceVars.put(
"$VAR_STREAMING_PATH2",
Files.createDirectory(temporaryFolder.resolve("streaming2")).toFile().getPath());
+ replaceVars.put(
+ "$VAR_STREAMING_PATH3",
+
Files.createDirectory(temporaryFolder.resolve("streaming3")).toFile().getPath());
replaceVars.put(
"$VAR_BATCH_PATH",
Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
@@ -124,6 +127,11 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
replaceVars.put(
"$VAR_REST_PORT",
MINI_CLUSTER.getClientConfiguration().get(PORT).toString());
+ replaceVars.put(
+ "$VAR_STREAMING_PLAN_PATH",
+
Files.createDirectory(temporaryFolder.resolve("streaming_compiled_plan"))
+ .toFile()
+ .getPath());
}
@TestTemplate
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index 323f24b81f1..1314e4d31bd 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -87,6 +87,89 @@ SELECT * FROM StreamingTable;
7 rows in set
!ok
+# ==========================================================================
+# test streaming insert through compiled plan
+# ==========================================================================
+
+create table StreamingTableForPlan (
+ id int,
+ str string
+) with (
+ 'connector' = 'filesystem',
+ 'path' = '$VAR_STREAMING_PATH3',
+ 'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+COMPILE AND EXECUTE PLAN '$VAR_STREAMING_PLAN_PATH/plan1.json' FOR INSERT INTO
StreamingTableForPlan SELECT * FROM (VALUES (1, 'Hello'));
+!output
+Job ID:
+!info
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTableForPlan;
+!output
++----+----+-------+
+| op | id | str |
++----+----+-------+
+| +I | 1 | Hello |
++----+----+-------+
+1 row in set
+!ok
+
+
+COMPILE PLAN '$VAR_STREAMING_PLAN_PATH/plan2.json' FOR INSERT INTO
StreamingTableForPlan SELECT * FROM (VALUES (1, 'Hello'));
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+EXECUTE PLAN '$VAR_STREAMING_PLAN_PATH/plan2.json';
+!output
+Job ID:
+!info
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTableForPlan;
+!output
++----+----+-------+
+| op | id | str |
++----+----+-------+
+| +I | 1 | Hello |
+| +I | 1 | Hello |
++----+----+-------+
+2 rows in set
+!ok
+
# ==========================================================================
# test batch insert
# ==========================================================================
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 76a52cd7d13..9b532abfa97 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -880,16 +880,7 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
List<Transformation<?>> transformations = translate(mapOperations);
List<String> sinkIdentifierNames =
extractSinkIdentifierNames(mapOperations);
- TableResultInternal result = executeInternal(transformations,
sinkIdentifierNames);
- if (tableConfig.get(TABLE_DML_SYNC)) {
- try {
- result.await();
- } catch (InterruptedException | ExecutionException e) {
- result.getJobClient().ifPresent(JobClient::cancel);
- throw new TableException("Fail to wait execution finish.", e);
- }
- }
- return result;
+ return executeInternal(transformations, sinkIdentifierNames);
}
private TableResultInternal executeInternal(
@@ -927,13 +918,24 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
affectedRowCounts[i] = -1L;
}
- return TableResultImpl.builder()
- .jobClient(jobClient)
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .schema(ResolvedSchema.of(columns))
- .resultProvider(
- new
InsertResultProvider(affectedRowCounts).setJobClient(jobClient))
- .build();
+ TableResultInternal result =
+ TableResultImpl.builder()
+ .jobClient(jobClient)
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .schema(ResolvedSchema.of(columns))
+ .resultProvider(
+ new InsertResultProvider(affectedRowCounts)
+ .setJobClient(jobClient))
+ .build();
+ if (tableConfig.get(TABLE_DML_SYNC)) {
+ try {
+ result.await();
+ } catch (InterruptedException | ExecutionException e) {
+ result.getJobClient().ifPresent(JobClient::cancel);
+ throw new TableException("Fail to wait execution finish.",
e);
+ }
+ }
+ return result;
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
}