fsk119 commented on a change in pull request #15317:
URL: https://github.com/apache/flink/pull/15317#discussion_r644533090
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichExplain.java
##########
@@ -28,18 +28,21 @@
import java.util.Collections;
import java.util.List;
+import java.util.Set;
-/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */
+/** EXPLAIN (PLAN FOR)* | ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN
STATEMENT sql call. */
Review comment:
What about`EXPLAIN with details statement sql call.`
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
##########
@@ -210,6 +211,36 @@ class StreamTableEnvironmentTest extends TableTestBase {
jTEnv.fromDataStream(ds, $("rt").rowtime(), $("b"), $("c"), $("d"),
$("e"), $("pt").proctime())
}
+ private def prepareSchemaExpressionParser:
+ (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
+
+ val jStreamExecEnv = mock(classOf[JStreamExecEnv])
+
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+ val config = new TableConfig
+ val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
+ val moduleManager: ModuleManager = new ModuleManager
+ val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
+ val functionCatalog = new FunctionCatalog(config, catalogManager,
moduleManager)
+ val streamPlanner = new StreamPlanner(executor, config, functionCatalog,
catalogManager)
+ val jTEnv = new JStreamTableEnvironmentImpl(
+ catalogManager,
+ moduleManager,
+ functionCatalog,
+ config,
+ jStreamExecEnv,
+ streamPlanner,
+ executor,
+ true,
+ Thread.currentThread().getContextClassLoader)
+
+ val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING,
Types.INT, Types.LONG)
+ .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
+ val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt,
JLong]]])
+ when(ds.getType).thenReturn(sType)
+
+ (jTEnv, ds)
+ }
+
Review comment:
Do you just move this method from the end of file to here?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
##########
@@ -37,10 +47,23 @@ public Operation getChild() {
@Override
public String asSummaryString() {
+ String operationName = "EXPLAIN";
+ if (CollectionUtils.isNotEmpty(explainDetails)) {
Review comment:
nit: it seems explainDetails is not nullable. We can use
`explainDetails.empty()` to check.
##########
File path:
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
##########
@@ -1336,6 +1380,18 @@ public void testExplainUpsert() {
this.sql(sql).ok(expected);
}
+ @Test
+ public void testExplainPlanForWithExplainDetails() {
+ String sql = "explain plan for ^json_execution_plan^ upsert into emps1
values (1, 2)";
+ this.sql(sql).fails("(?s).*Encountered \"json_execution_plan\" at line
1, column 18.\n.*");
+ }
Review comment:
Why add `^` in the sql?
Please use `this.sql(sql).node(new ValidationMatcher(...));` to verify the
errors.
Could you also add negative cases where explain with unknown keyword or ddl
statement?
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
##########
@@ -18,17 +18,27 @@
package org.apache.flink.table.operations;
+import org.apache.commons.collections.CollectionUtils;
+
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * Operation to describe an EXPLAIN statement. NOTES: currently, only default
behavior (EXPLAIN
- * [PLAN FOR] xx) is supported.
- */
+/** Operation to describe an EXPLAIN statement. */
public class ExplainOperation implements Operation {
private final Operation child;
+ private final Set<String> explainDetails;
+ // this construct used to adapt to the old API
public ExplainOperation(Operation child) {
this.child = child;
+ this.explainDetails = new HashSet<>();
+ }
Review comment:
Would be better to use.
```
this(child, new Hashset<>());
```
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1232,9 +1232,30 @@ public TableResult executeInternal(Operation operation) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof ExplainOperation) {
+ ExplainOperation explainOperation = (ExplainOperation) operation;
+ ExplainDetail[] explainDetails =
+ explainOperation.getExplainDetails().stream()
+ .map(
+ detail -> {
+ switch (detail.toUpperCase()) {
+ case "ESTIMATED_COST":
+ return
ExplainDetail.ESTIMATED_COST;
+ case "CHANGELOG_MODE":
+ return
ExplainDetail.CHANGELOG_MODE;
+ case "JSON_EXECUTION_PLAN":
+ return
ExplainDetail.JSON_EXECUTION_PLAN;
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported
EXPLAIN DETAIL: %s",
+ detail));
Review comment:
I think this method should belong to `ExplainDetail`.
##########
File path:
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -1634,16 +1634,52 @@ SqlEndStatementSet SqlEndStatementSet() :
SqlNode SqlRichExplain() :
{
SqlNode stmt;
+ Set<String> explainDetails = new HashSet<String>();
}
{
- <EXPLAIN> [ <PLAN> <FOR> ]
+ <EXPLAIN>
+ [
+ <PLAN> <FOR>
+ |
+ (
+ <ESTIMATED_COST>
+ |
+ <CHANGELOG_MODE>
+ |
+ <JSON_EXECUTION_PLAN>
+ ) {
+ if (explainDetails.contains(token.image.toUpperCase())) {
+ throw SqlUtil.newContextException(getPos(),
+ ParserResource.RESOURCE.explainDetailRepeat());
+ } else {
+ explainDetails.add(token.image.toUpperCase());
+ }
+ }
+ (
+ <COMMA>
+ (
+ <ESTIMATED_COST>
+ |
+ <CHANGELOG_MODE>
+ |
+ <JSON_EXECUTION_PLAN>
+ ) {
+ if (explainDetails.contains(token.image.toUpperCase())) {
+ throw SqlUtil.newContextException(getPos(),
+ ParserResource.RESOURCE.explainDetailRepeat());
+ } else {
+ explainDetails.add(token.image.toUpperCase());
+ }
+ }
Review comment:
What about introduce a function like below
```
void ParseExplainDetail(Set<String> explainDetails) :
{
}
{
(
<ESTIMATED_COST>
|
<CHANGELOG_MODE>
|
<JSON_EXECUTION_PLAN>
)
{
if (explainDetails.contains(token.image.toUpperCase())) {
throw SqlUtil.newContextException(getPos(),
ParserResource.RESOURCE.explainDetailRepeat());
} else {
explainDetails.add(token.image.toUpperCase());
}
}
}
```
In this way we can reuse the codes.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
##########
@@ -18,17 +18,27 @@
package org.apache.flink.table.operations;
+import org.apache.commons.collections.CollectionUtils;
+
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * Operation to describe an EXPLAIN statement. NOTES: currently, only default
behavior (EXPLAIN
- * [PLAN FOR] xx) is supported.
- */
+/** Operation to describe an EXPLAIN statement. */
public class ExplainOperation implements Operation {
private final Operation child;
+ private final Set<String> explainDetails;
+ // this construct used to adapt to the old API
Review comment:
I think we can remove this comment.
##########
File path:
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -1634,16 +1634,52 @@ SqlEndStatementSet SqlEndStatementSet() :
SqlNode SqlRichExplain() :
{
SqlNode stmt;
+ Set<String> explainDetails = new HashSet<String>();
}
{
- <EXPLAIN> [ <PLAN> <FOR> ]
+ <EXPLAIN>
+ [
+ <PLAN> <FOR>
+ |
+ (
+ <ESTIMATED_COST>
+ |
+ <CHANGELOG_MODE>
+ |
+ <JSON_EXECUTION_PLAN>
+ ) {
+ if (explainDetails.contains(token.image.toUpperCase())) {
+ throw SqlUtil.newContextException(getPos(),
+ ParserResource.RESOURCE.explainDetailRepeat());
Review comment:
`ParserResource.RESOURCE.explainDetailRepeat` ->
`ParserResource.RESOURCE.explainDetailIsDuplicate`
##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -99,12 +101,15 @@
keywords: [
"BYTES"
"CATALOGS"
+ "CHANGELOG_MODE"
Review comment:
Could you also add these keywords into the `nonReservedKeywords`?
##########
File path:
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
##########
@@ -1283,11 +1283,44 @@ public void testEnd() {
@Test
public void testExplain() {
+ String sql = "explain select * from emps";
+ String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`";
Review comment:
`"EXPLAIN SELECT *\n" + "FROM `EMPS`"` -> `EXPLAIN SELECT * FROM `EMPS``
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
##########
@@ -302,13 +302,22 @@ private static String transformOutput(
out.append(sqlScript.comment).append(sqlScript.sql);
if (i < results.size()) {
Result result = results.get(i);
-
out.append(result.content).append(result.highestTag.tag).append("\n");
+ String content = replaceStreamNodeId(result.content);
+ out.append(content).append(result.highestTag.tag).append("\n");
}
}
return out.toString();
}
+ /*
+ * Stream node {id} is ignored, because id keeps incrementing in test
class while
+ * StreamExecutionEnvironment is up
+ */
+ private static String replaceStreamNodeId(String s) {
+ return s.replaceAll("\"id\" : \\d+", "\"id\" : ");
+ }
Review comment:
rename to `removeStreamNodeId` and the input parameter should be more
meaningful.
I think you can remove the comments or refine the words.. It's a little
difficult to understand..
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -1375,6 +1377,126 @@ class TableEnvironmentTest {
assertEquals(replaceStageId(expected), replaceStageId(actual))
}
+ @Test
+ def testExecuteSqlWithExplainDetailsSelect(): Unit = {
+ val createTableStmt =
+ """
+ |CREATE TABLE MyTable (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = tableEnv
+ .executeSql("explain changelog_mode, estimated_cost, json_execution_plan
" +
+ "select * from MyTable where a > 10")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = replaceStreamNodeId(row.getField(0).toString.trim)
+ val expected = replaceStreamNodeId(TableTestUtil
+
.readFromResource("/explain/testExecuteSqlWithExplainDetailsSelect.out").trim)
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
Review comment:
Seems this part codes is almost as same as the code below?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
##########
@@ -649,16 +649,13 @@ private Operation convertShowViews(SqlShowViews
sqlShowViews) {
private Operation convertRichExplain(SqlRichExplain sqlExplain) {
Operation operation;
SqlNode sqlNode = sqlExplain.getStatement();
+ Set<String> explainDetails = sqlExplain.getExplainDetails();
if (sqlNode instanceof RichSqlInsert) {
operation = convertSqlInsert((RichSqlInsert) sqlNode);
- } else if (sqlNode instanceof SqlSelect) {
- operation = convertSqlQuery(sqlExplain.getStatement());
} else {
- throw new ValidationException(
- String.format(
- "EXPLAIN statement doesn't support %s",
sqlNode.getKind().toString()));
Review comment:
Why modify these?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]