This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fa337023c5 [FLINK-38081][table] Support EXPLAIN before EXECUTE 
statement (#26785)
3fa337023c5 is described below

commit 3fa337023c5f13e36fbd57aaba697af235ced3a0
Author: Ramin Gharib <ramingha...@gmail.com>
AuthorDate: Mon Jul 14 09:40:35 2025 +0200

    [FLINK-38081][table] Support EXPLAIN before EXECUTE statement (#26785)
---
 .../src/test/resources/sql_multi/statement_set.q   | 33 +++++++++++++
 .../src/test/resources/sql/insert.q                | 55 +++++++++++++++++++++
 .../src/test/resources/sql/statement_set.q         | 56 ++++++++++++++++++++++
 .../src/main/codegen/includes/parserImpls.ftl      |  2 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 27 +++++++++++
 .../operations/SqlNodeToOperationConversion.java   | 15 ++++++
 .../operations/SqlDmlToOperationConverterTest.java | 31 ++++++++++++
 7 files changed, 219 insertions(+)

diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q 
b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index afee14890fa..0f223fb6172 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -101,6 +101,39 @@ 
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
 1 row in set
 !ok
 
+EXPLAIN EXECUTE STATEMENT SET BEGIN
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+END;
+!output
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+|                                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| == Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.StreamingTable], 
fields=[EXPR$0, EXPR$1])
++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
+   +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' 
}, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+
+LogicalSink(table=[default_catalog.default_database.StreamingTable], 
fields=[EXPR$0, EXPR$1])
++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
+   +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' 
}, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+   +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+   +- Reused(reference_id=[1])
+ |
++-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+1 row in set
+!ok
+
 EXECUTE STATEMENT SET BEGIN
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index ddaff5e1f34..2c249991b94 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -87,6 +87,61 @@ SELECT * FROM StreamingTable;
 7 rows in set
 !ok
 
+# ==========================================================================
+# test streaming execute insert
+# ==========================================================================
+
+create table StreamingTable2 (
+  id int,
+  str string
+) with (
+  'connector' = 'filesystem',
+  'path' = '$VAR_STREAMING_PATH2',
+  'format' = 'csv'
+);
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+EXPLAIN EXECUTE INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 
'EXECUTE'), (2, 'INSERT'), (3, 'TEST'));
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.StreamingTable2], 
fields=[EXPR$0, EXPR$1])
++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
+   +- LogicalValues(tuples=[[{ 1, _UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' 
}, { 3, _UTF-16LE'TEST' }]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable2], fields=[EXPR$0, 
EXPR$1])
++- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(7) EXPR$1)], tuples=[[{ 1, 
_UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' }, { 3, _UTF-16LE'TEST' }]])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable2], fields=[EXPR$0, 
EXPR$1])
++- Values(tuples=[[{ 1, _UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' }, { 3, 
_UTF-16LE'TEST' }]])
+!ok
+
+EXECUTE INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'EXECUTE'), (2, 
'INSERT'), (3, 'TEST'));
+!output
+Job ID:
+!info
+
+SELECT * FROM StreamingTable2;
+!output
++----+----+---------+
+| op | id |     str |
++----+----+---------+
+| +I |  1 | EXECUTE |
+| +I |  2 |  INSERT |
+| +I |  3 |    TEST |
++----+----+---------+
+3 rows in set
+!ok
+
+
 # ==========================================================================
 # test streaming insert through compiled plan
 # ==========================================================================
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index 486fe3d43cc..ecd37cb388f 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -116,6 +116,33 @@ 
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
    +- Reused(reference_id=[1])
 !ok
 
+EXPLAIN EXECUTE STATEMENT SET BEGIN
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
+END;
+!output
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.StreamingTable], 
fields=[EXPR$0, EXPR$1])
++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
+   +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' 
}, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+
+LogicalSink(table=[default_catalog.default_database.StreamingTable], 
fields=[EXPR$0, EXPR$1])
++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
+   +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' 
}, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+   +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 
1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, 
_UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, 
_UTF-16LE'LINE' }]])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, 
EXPR$1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+   :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 
2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, 
_UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+   +- Reused(reference_id=[1])
+!ok
+
 EXECUTE STATEMENT SET BEGIN
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
@@ -140,6 +167,35 @@ SELECT * FROM StreamingTable;
 7 rows in set
 !ok
 
+EXPLAIN EXECUTE SELECT * FROM StreamingTable;
+!output
+== Abstract Syntax Tree ==
+LogicalProject(id=[$0], str=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, 
StreamingTable]])
+
+== Optimized Physical Plan ==
+TableSourceScan(table=[[default_catalog, default_database, StreamingTable]], 
fields=[id, str])
+
+== Optimized Execution Plan ==
+TableSourceScan(table=[[default_catalog, default_database, StreamingTable]], 
fields=[id, str])
+!ok
+
+EXECUTE SELECT * FROM StreamingTable;
+!output
++----+-------------+
+| id |         str |
++----+-------------+
+|  1 | Hello World |
+|  2 |          Hi |
+|  2 |          Hi |
+|  3 |       Hello |
+|  3 |       World |
+|  4 |         ADD |
+|  5 |        LINE |
++----+-------------+
+7 rows in set
+!ok
+
 SELECT * FROM StreamingTable2;
 !output
 +----+----+-------------+
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 450ce163d2d..24aa5497e6d 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2854,6 +2854,8 @@ SqlNode SqlRichExplain() :
         |
         stmt = SqlStatementSet()
         |
+        stmt = SqlExecute()
+        |
         stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
         |
         stmt = RichSqlInsert()
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index ff38568b463..8033010326f 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2563,6 +2563,33 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                 + "END");
     }
 
+    @Test
+    void testExplainExecuteStatementSet() {
+        sql("explain execute statement set begin insert into t1 select * from 
t2; insert into t2 select * from t3; end")
+                .ok(
+                        "EXPLAIN EXECUTE STATEMENT SET BEGIN\n"
+                                + "INSERT INTO `T1`\n"
+                                + "SELECT *\n"
+                                + "FROM `T2`\n"
+                                + ";\n"
+                                + "INSERT INTO `T2`\n"
+                                + "SELECT *\n"
+                                + "FROM `T3`\n"
+                                + ";\n"
+                                + "END");
+    }
+
+    @Test
+    void testExplainExecuteSelect() {
+        sql("explain execute select * from emps").ok("EXPLAIN EXECUTE SELECT 
*\nFROM `EMPS`");
+    }
+
+    @Test
+    void testExplainExecuteInsert() {
+        sql("explain execute insert into emps1 select * from emps2")
+                .ok("EXPLAIN EXECUTE INSERT INTO `EMPS1`\nSELECT *\nFROM 
`EMPS2`");
+    }
+
     @Test
     void testExplain() {
         String sql = "explain select * from emps";
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index ed2888ce61a..6c0b31cf10d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -901,6 +901,21 @@ public class SqlNodeToOperationConversion {
             operation = convertSqlInsert((RichSqlInsert) sqlNode);
         } else if (sqlNode instanceof SqlStatementSet) {
             operation = convertSqlStatementSet((SqlStatementSet) sqlNode);
+        } else if (sqlNode instanceof SqlExecute) {
+            // Handle EXPLAIN EXECUTE STATEMENT SET by extracting the inner 
statement
+            SqlNode innerStatement = ((SqlExecute) sqlNode).getStatement();
+            if (innerStatement instanceof SqlStatementSet) {
+                operation = convertSqlStatementSet((SqlStatementSet) 
innerStatement);
+            } else if (innerStatement instanceof RichSqlInsert) {
+                operation = convertSqlInsert((RichSqlInsert) innerStatement);
+            } else if (innerStatement.getKind().belongsTo(SqlKind.QUERY)) {
+                operation = convertSqlQuery(innerStatement);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "EXPLAIN EXECUTE statement doesn't support %s",
+                                innerStatement.getKind()));
+            }
         } else if (sqlNode.getKind().belongsTo(SqlKind.QUERY)) {
             operation = convertSqlQuery(sqlExplain.getStatement());
         } else if ((sqlNode instanceof SqlCreateTableAs)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
index 057e3127ac1..afe84800fd3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
@@ -227,6 +227,19 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(operation).isInstanceOf(StatementSetOperation.class);
     }
 
+    @Test
+    public void testSqlRichExplainWithExecuteStatementSet() {
+        final String sql =
+                "EXPLAIN EXECUTE STATEMENT SET BEGIN "
+                        + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 
1;"
+                        + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 
2;"
+                        + "END";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
     @Test
     public void testSqlExecuteWithInsert() {
         final String sql = "execute insert into t1 select a, b, c, d from t2 
where a > 1";
@@ -236,6 +249,15 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(operation).isInstanceOf(SinkModifyOperation.class);
     }
 
+    @Test
+    public void testSqlRichExplainWithExecuteInsert() {
+        final String sql = "EXPLAIN EXECUTE INSERT INTO t1 SELECT a, b, c, d 
FROM t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
     @Test
     public void testSqlExecuteWithSelect() {
         final String sql = "execute select a, b, c, d from t2 where a > 1";
@@ -245,6 +267,15 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
         assertThat(operation).isInstanceOf(QueryOperation.class);
     }
 
+    @Test
+    public void testSqlRichExplainWithExecuteSelect() {
+        final String sql = "EXPLAIN EXECUTE SELECT a, b, c, d FROM t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
     @Test
     public void testDelete() throws Exception {
         Map<String, String> options = new HashMap<>();

Reply via email to