This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new be53133546d [FLINK-32219][sql-client] Fix SqlClient hangs when 
executing EXECUTE PLAN statement (#22725)
be53133546d is described below

commit be53133546d054e282c0a24bfe722d0d276f9a8f
Author: Jane Chan <[email protected]>
AuthorDate: Thu Jun 8 10:11:17 2023 +0800

    [FLINK-32219][sql-client] Fix SqlClient hangs when executing EXECUTE PLAN 
statement (#22725)
    
    (cherry picked from commit 83ba6b5348cbffb26e8d1d5ce6e8d6bb1994e3bc)
---
 .../service/operation/OperationExecutor.java       | 16 +++++
 .../gateway/AbstractSqlGatewayStatementITCase.java |  8 +++
 .../src/test/resources/sql/insert.q                | 83 ++++++++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   | 36 +++++-----
 4 files changed, 126 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 22c4ffcdf32..1be89881aec 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -67,6 +67,7 @@ import 
org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -77,6 +78,7 @@ import 
org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.ExecutePlanOperation;
 import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
@@ -423,6 +425,9 @@ public class OperationExecutor {
         } else if (op instanceof ModifyOperation) {
             return callModifyOperations(
                     tableEnv, handle, 
Collections.singletonList((ModifyOperation) op));
+        } else if (op instanceof CompileAndExecutePlanOperation
+                || op instanceof ExecutePlanOperation) {
+            return callExecuteOperation(tableEnv, handle, op);
         } else if (op instanceof StatementSetOperation) {
             return callModifyOperations(
                     tableEnv, handle, ((StatementSetOperation) 
op).getOperations());
@@ -512,6 +517,17 @@ public class OperationExecutor {
             return ResultFetcher.fromTableResult(handle, result, false);
         }
 
+        return fetchJobId(result, handle);
+    }
+
+    private ResultFetcher callExecuteOperation(
+            TableEnvironmentInternal tableEnv,
+            OperationHandle handle,
+            Operation executePlanOperation) {
+        return fetchJobId(tableEnv.executeInternal(executePlanOperation), 
handle);
+    }
+
+    private ResultFetcher fetchJobId(TableResultInternal result, 
OperationHandle handle) {
         JobID jobID =
                 result.getJobClient()
                         .orElseThrow(
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 0ba90189172..94813359752 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -116,6 +116,9 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
         replaceVars.put(
                 "$VAR_STREAMING_PATH2",
                 
Files.createDirectory(temporaryFolder.resolve("streaming2")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_STREAMING_PATH3",
+                
Files.createDirectory(temporaryFolder.resolve("streaming3")).toFile().getPath());
         replaceVars.put(
                 "$VAR_BATCH_PATH",
                 
Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
@@ -124,6 +127,11 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                 
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
         replaceVars.put(
                 "$VAR_REST_PORT", 
MINI_CLUSTER.getClientConfiguration().get(PORT).toString());
+        replaceVars.put(
+                "$VAR_STREAMING_PLAN_PATH",
+                
Files.createDirectory(temporaryFolder.resolve("streaming_compiled_plan"))
+                        .toFile()
+                        .getPath());
     }
 
     @TestTemplate
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index 323f24b81f1..1314e4d31bd 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -87,6 +87,89 @@ SELECT * FROM StreamingTable;
 7 rows in set
 !ok
 
+# ==========================================================================
+# test streaming insert through compiled plan
+# ==========================================================================
+
+create table StreamingTableForPlan (
+  id int,
+  str string
+) with (
+  'connector' = 'filesystem',
+  'path' = '$VAR_STREAMING_PATH3',
+  'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+COMPILE AND EXECUTE PLAN '$VAR_STREAMING_PLAN_PATH/plan1.json' FOR INSERT INTO 
StreamingTableForPlan SELECT * FROM (VALUES (1, 'Hello'));
+!output
+Job ID:
+!info
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTableForPlan;
+!output
++----+----+-------+
+| op | id |   str |
++----+----+-------+
+| +I |  1 | Hello |
++----+----+-------+
+1 row in set
+!ok
+
+
+COMPILE PLAN '$VAR_STREAMING_PLAN_PATH/plan2.json' FOR INSERT INTO 
StreamingTableForPlan SELECT * FROM (VALUES (1, 'Hello'));
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+EXECUTE PLAN '$VAR_STREAMING_PLAN_PATH/plan2.json';
+!output
+Job ID:
+!info
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM StreamingTableForPlan;
+!output
++----+----+-------+
+| op | id |   str |
++----+----+-------+
+| +I |  1 | Hello |
+| +I |  1 | Hello |
++----+----+-------+
+2 rows in set
+!ok
+
 # ==========================================================================
 # test batch insert
 # ==========================================================================
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 76a52cd7d13..9b532abfa97 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -880,16 +880,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 
         List<Transformation<?>> transformations = translate(mapOperations);
         List<String> sinkIdentifierNames = 
extractSinkIdentifierNames(mapOperations);
-        TableResultInternal result = executeInternal(transformations, 
sinkIdentifierNames);
-        if (tableConfig.get(TABLE_DML_SYNC)) {
-            try {
-                result.await();
-            } catch (InterruptedException | ExecutionException e) {
-                result.getJobClient().ifPresent(JobClient::cancel);
-                throw new TableException("Fail to wait execution finish.", e);
-            }
-        }
-        return result;
+        return executeInternal(transformations, sinkIdentifierNames);
     }
 
     private TableResultInternal executeInternal(
@@ -927,13 +918,24 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                 affectedRowCounts[i] = -1L;
             }
 
-            return TableResultImpl.builder()
-                    .jobClient(jobClient)
-                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-                    .schema(ResolvedSchema.of(columns))
-                    .resultProvider(
-                            new 
InsertResultProvider(affectedRowCounts).setJobClient(jobClient))
-                    .build();
+            TableResultInternal result =
+                    TableResultImpl.builder()
+                            .jobClient(jobClient)
+                            .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                            .schema(ResolvedSchema.of(columns))
+                            .resultProvider(
+                                    new InsertResultProvider(affectedRowCounts)
+                                            .setJobClient(jobClient))
+                            .build();
+            if (tableConfig.get(TABLE_DML_SYNC)) {
+                try {
+                    result.await();
+                } catch (InterruptedException | ExecutionException e) {
+                    result.getJobClient().ifPresent(JobClient::cancel);
+                    throw new TableException("Fail to wait execution finish.", 
e);
+                }
+            }
+            return result;
         } catch (Exception e) {
             throw new TableException("Failed to execute sql", e);
         }

Reply via email to