This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3fa337023c5 [FLINK-38081][table] Support EXPLAIN before EXECUTE statement (#26785) 3fa337023c5 is described below commit 3fa337023c5f13e36fbd57aaba697af235ced3a0 Author: Ramin Gharib <ramingha...@gmail.com> AuthorDate: Mon Jul 14 09:40:35 2025 +0200 [FLINK-38081][table] Support EXPLAIN before EXECUTE statement (#26785) --- .../src/test/resources/sql_multi/statement_set.q | 33 +++++++++++++ .../src/test/resources/sql/insert.q | 55 +++++++++++++++++++++ .../src/test/resources/sql/statement_set.q | 56 ++++++++++++++++++++++ .../src/main/codegen/includes/parserImpls.ftl | 2 + .../flink/sql/parser/FlinkSqlParserImplTest.java | 27 +++++++++++ .../operations/SqlNodeToOperationConversion.java | 15 ++++++ .../operations/SqlDmlToOperationConverterTest.java | 31 ++++++++++++ 7 files changed, 219 insertions(+) diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q index afee14890fa..0f223fb6172 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q +++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q @@ -101,6 +101,39 @@ Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX 1 row in set !ok +EXPLAIN EXECUTE STATEMENT SET BEGIN +INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); +INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); +END; +!output ++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| [...] ++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| == Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) + +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) + +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) + +- Reused(reference_id=[1]) + | ++----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +1 row in set +!ok + EXECUTE STATEMENT SET BEGIN INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q index ddaff5e1f34..2c249991b94 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q @@ -87,6 +87,61 @@ SELECT * FROM StreamingTable; 7 rows in set !ok +# ========================================================================== +# test streaming execute insert +# ========================================================================== + +create table StreamingTable2 ( + id int, + str string +) with ( + 'connector' = 'filesystem', + 'path' = '$VAR_STREAMING_PATH2', + 'format' = 'csv' +); +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +EXPLAIN EXECUTE INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'EXECUTE'), (2, 'INSERT'), (3, 'TEST')); +!output +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.StreamingTable2], fields=[EXPR$0, EXPR$1]) ++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) + +- LogicalValues(tuples=[[{ 1, _UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' }, { 3, _UTF-16LE'TEST' }]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.StreamingTable2], fields=[EXPR$0, EXPR$1]) ++- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(7) EXPR$1)], tuples=[[{ 1, _UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' }, { 3, _UTF-16LE'TEST' }]]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.StreamingTable2], fields=[EXPR$0, EXPR$1]) ++- Values(tuples=[[{ 1, _UTF-16LE'EXECUTE' }, { 2, _UTF-16LE'INSERT' }, { 3, _UTF-16LE'TEST' }]]) +!ok + +EXECUTE INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'EXECUTE'), (2, 'INSERT'), (3, 'TEST')); +!output +Job ID: +!info + +SELECT * FROM StreamingTable2; +!output ++----+----+---------+ +| op | id | str | ++----+----+---------+ +| +I | 1 | EXECUTE | +| +I | 2 | INSERT | +| +I | 3 | TEST | ++----+----+---------+ +3 rows in set +!ok + + # ========================================================================== # test streaming insert through compiled plan # ========================================================================== diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q index 486fe3d43cc..ecd37cb388f 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q @@ -116,6 +116,33 @@ Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX +- Reused(reference_id=[1]) !ok +EXPLAIN EXECUTE STATEMENT SET BEGIN +INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); +INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); +END; +!output +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) + +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) + +- LogicalValues(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) + +- Reused(reference_id=[1]) +!ok + EXECUTE STATEMENT SET BEGIN INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); @@ -140,6 +167,35 @@ SELECT * FROM StreamingTable; 7 rows in set !ok +EXPLAIN EXECUTE SELECT * FROM StreamingTable; +!output +== Abstract Syntax Tree == +LogicalProject(id=[$0], str=[$1]) ++- LogicalTableScan(table=[[default_catalog, default_database, StreamingTable]]) + +== Optimized Physical Plan == +TableSourceScan(table=[[default_catalog, default_database, StreamingTable]], fields=[id, str]) + +== Optimized Execution Plan == +TableSourceScan(table=[[default_catalog, default_database, StreamingTable]], fields=[id, str]) +!ok + +EXECUTE SELECT * FROM StreamingTable; +!output ++----+-------------+ +| id | str | ++----+-------------+ +| 1 | Hello World | +| 2 | Hi | +| 2 | Hi | +| 3 | Hello | +| 3 | World | +| 4 | ADD | +| 5 | LINE | ++----+-------------+ +7 rows in set +!ok + SELECT * FROM StreamingTable2; !output +----+----+-------------+ diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 450ce163d2d..24aa5497e6d 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -2854,6 +2854,8 @@ SqlNode SqlRichExplain() : | stmt = SqlStatementSet() | + stmt = SqlExecute() + | stmt = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) | stmt = RichSqlInsert() diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index ff38568b463..8033010326f 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2563,6 +2563,33 @@ class FlinkSqlParserImplTest extends SqlParserTest { + "END"); } + @Test + void testExplainExecuteStatementSet() { + sql("explain execute statement set begin insert into t1 select * from t2; insert into t2 select * from t3; end") + .ok( + "EXPLAIN EXECUTE STATEMENT SET BEGIN\n" + + "INSERT INTO `T1`\n" + + "SELECT *\n" + + "FROM `T2`\n" + + ";\n" + + "INSERT INTO `T2`\n" + + "SELECT *\n" + + "FROM `T3`\n" + + ";\n" + + "END"); + } + + @Test + void testExplainExecuteSelect() { + sql("explain execute select * from emps").ok("EXPLAIN EXECUTE SELECT *\nFROM `EMPS`"); + } + + @Test + void testExplainExecuteInsert() { + sql("explain execute insert into emps1 select * from emps2") + .ok("EXPLAIN EXECUTE INSERT INTO `EMPS1`\nSELECT *\nFROM `EMPS2`"); + } + @Test void testExplain() { String sql = "explain select * from emps"; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index ed2888ce61a..6c0b31cf10d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -901,6 +901,21 @@ public class SqlNodeToOperationConversion { operation = convertSqlInsert((RichSqlInsert) sqlNode); } else if (sqlNode instanceof SqlStatementSet) { operation = convertSqlStatementSet((SqlStatementSet) sqlNode); + } else if (sqlNode instanceof SqlExecute) { + // Handle EXPLAIN EXECUTE STATEMENT SET by extracting the inner statement + SqlNode innerStatement = ((SqlExecute) sqlNode).getStatement(); + if (innerStatement instanceof SqlStatementSet) { + operation = convertSqlStatementSet((SqlStatementSet) innerStatement); + } else if (innerStatement instanceof RichSqlInsert) { + operation = convertSqlInsert((RichSqlInsert) innerStatement); + } else if (innerStatement.getKind().belongsTo(SqlKind.QUERY)) { + operation = convertSqlQuery(innerStatement); + } else { + throw new ValidationException( + String.format( + "EXPLAIN EXECUTE statement doesn't support %s", + innerStatement.getKind())); + } } else if (sqlNode.getKind().belongsTo(SqlKind.QUERY)) { operation = convertSqlQuery(sqlExplain.getStatement()); } else if ((sqlNode instanceof SqlCreateTableAs) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java index 057e3127ac1..afe84800fd3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java @@ -227,6 +227,19 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion assertThat(operation).isInstanceOf(StatementSetOperation.class); } + @Test + public void testSqlRichExplainWithExecuteStatementSet() { + final String sql = + "EXPLAIN EXECUTE STATEMENT SET BEGIN " + + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 1;" + + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 2;" + + "END"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + Operation operation = parse(sql, planner, parser); + assertThat(operation).isInstanceOf(ExplainOperation.class); + } + @Test public void testSqlExecuteWithInsert() { final String sql = "execute insert into t1 select a, b, c, d from t2 where a > 1"; @@ -236,6 +249,15 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion assertThat(operation).isInstanceOf(SinkModifyOperation.class); } + @Test + public void testSqlRichExplainWithExecuteInsert() { + final String sql = "EXPLAIN EXECUTE INSERT INTO t1 SELECT a, b, c, d FROM t2"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + Operation operation = parse(sql, planner, parser); + assertThat(operation).isInstanceOf(ExplainOperation.class); + } + @Test public void testSqlExecuteWithSelect() { final String sql = "execute select a, b, c, d from t2 where a > 1"; @@ -245,6 +267,15 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion assertThat(operation).isInstanceOf(QueryOperation.class); } + @Test + public void testSqlRichExplainWithExecuteSelect() { + final String sql = "EXPLAIN EXECUTE SELECT a, b, c, d FROM t2"; + FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + Operation operation = parse(sql, planner, parser); + assertThat(operation).isInstanceOf(ExplainOperation.class); + } + @Test public void testDelete() throws Exception { Map<String, String> options = new HashMap<>();