chaozwn commented on a change in pull request #15317:
URL: https://github.com/apache/flink/pull/15317#discussion_r598710297



##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -105,19 +149,31 @@ public void testCommands() throws Exception {
                                 "EXPLAIN ", // no query
                                 SqlExecutionException.class,
                                 "Encountered \"<EOF>\""),
-                        // explain plan for xx
+                        // explain xx
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable"),
                         TestItem.validSql(
-                                "EXPLAIN PLAN FOR SELECT a FROM MyTable",

Review comment:
       remove 'plan for' ,because expected result not have 'plan for' keyword

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -55,12 +55,56 @@ public void setup() {
         tableEnv = helper.getTableEnv();
     }
 
+    @Test
+    public void testExplainDetails() {
+        List<TestItem> testItems =
+                Arrays.asList(
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN INSERT INTO 
MySink(c) SELECT c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN INSERT INTO 
MySink(c) SELECT c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN INSERT INTO MySink(c) SELECT c FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN INSERT INTO MySink(c) SELECT c FROM 
MyTable"),
+                        TestItem.invalidSql(
+                                "EXPLAIN INSERT INTO MySink(c) SELECT xxx FROM 
MyTable",
+                                SqlExecutionException.class,
+                                "Column 'xxx' not found in any table"),
+                        TestItem.invalidSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,JSON_EXECUTION_PLAN INSERT INTO MySink(c) SELECT xxx FROM 
MyTable",
+                                SqlExecutionException.class,
+                                "Duplicate EXPLAIN KEY: JSON_EXECUTION_PLAN"));
+
+        for (TestItem item : testItems) {
+            tableEnv.getConfig().setSqlDialect(item.sqlDialect);
+            runTestItem(item);
+        }
+    }
+
     @Test
     public void testCommands() throws Exception {
         List<TestItem> testItems =
                 Arrays.asList(
                         TestItem.validSql("QUIT;", 
SqlCommand.QUIT).cannotParseComment(),
-                        TestItem.validSql("eXiT;", 
SqlCommand.QUIT).cannotParseComment(),
+                        // TODO: 2021/3/18 修改变量
+                        TestItem.validSql("EXIT;", 
SqlCommand.QUIT).cannotParseComment(),

Review comment:
       sorry, i will rollback

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -82,6 +82,8 @@
     "org.apache.calcite.sql.SqlDrop"
     "java.util.List"
     "java.util.ArrayList"
+    "org.apache.calcite.sql.SqlUtil"
+    "org.apache.flink.sql.parser.dql.SqlExplainDetails"

Review comment:
       i see

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -449,6 +454,9 @@
     "PARTITIONED"
     "PARTITIONS"
     "VIRTUAL"
+    "ESTIMATED_COST"
+    "CHANGELOG_MODE"
+    "JSON_EXECUTION_PLAN"

Review comment:
       ok

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }
+        statement.unparse(writer, leftPrec, rightPrec);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        List<String> repeatDetails =

Review comment:
       👌

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }
+        statement.unparse(writer, leftPrec, rightPrec);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        List<String> repeatDetails =
+                explainDetails.stream()
+                        .collect(Collectors.toMap(e -> e, e -> 1, 
Integer::sum))
+                        .entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() > 1)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+
+        if (repeatDetails.size() > 0) {
+            throw new SqlValidateException(
+                    this.getParserPosition(),
+                    String.format("Duplicate EXPLAIN KEY: %s", String.join(", 
", repeatDetails)));

Review comment:
       👌

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainDetailOperation.java
##########
@@ -0,0 +1,40 @@
+package org.apache.flink.table.operations;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** explain operation. */

Review comment:
       this class will remove

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1162,6 +1164,7 @@ public TableResult executeInternal(Operation operation) {
                 throw new TableException(exMsg, e);
             }
         } else if (operation instanceof ExplainOperation) {
+            // In fact, never get here

Review comment:
       i will remove ExplainDetailOperation

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
##########
@@ -142,10 +142,14 @@ class FlinkPlannerImpl(
         return sqlNode
       }
       sqlNode match {
+        // this explain should be deprecated
         case explain: SqlExplain =>
           val validated = validator.validate(explain.getExplicandum)
           explain.setOperand(0, validated)
           explain
+        case explainDetails: SqlExplainDetails =>
+          validator.validate(explainDetails.getStatement)

Review comment:
       👌

##########
File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -1451,6 +1451,30 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
     }
 }
 
+/**
+* Parses a expain module statement.
+* EXPLAIN [ExplainDetail[, ExplainDetail]*] statement;
+*/
+SqlNode SqlExplainDetails() :
+{
+    SqlNode stmt;
+    List<String> explainDetails = new ArrayList<String>();
+}
+{
+    <EXPLAIN>
+    [
+        (<ESTIMATED_COST> | <CHANGELOG_MODE> | <JSON_EXECUTION_PLAN>) {
+           explainDetails.add(token.image.toUpperCase());
+        }
+        (<COMMA> (<ESTIMATED_COST> | <CHANGELOG_MODE> | <JSON_EXECUTION_PLAN>) 
{
+           explainDetails.add(token.image.toUpperCase());
+        })*
+    ]
+    stmt = SqlQueryOrDml() {
+        return new SqlExplainDetails(getPos(),stmt,explainDetails);

Review comment:
       in this way,we need to Support ‘plan for’ syntax ?

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }

Review comment:
       👌

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -116,6 +118,9 @@
     "WATERMARK"
     "WATERMARKS"
     "TIMESTAMP_LTZ"
+    "ESTIMATED_COST"
+    "CHANGELOG_MODE"
+    "JSON_EXECUTION_PLAN"

Review comment:
       i see

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1171,6 +1174,38 @@ public TableResult executeInternal(Operation operation) {
                     .data(Collections.singletonList(Row.of(explanation)))
                     .setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                     .build();
+        } else if (operation instanceof ExplainDetailOperation) {
+            ExplainDetailOperation explainOperation = (ExplainDetailOperation) 
operation;
+            ExplainDetail[] detailSet =
+                    explainOperation.getExplainDetails().stream()
+                            .map(
+                                    detail -> {
+                                        switch (detail) {
+                                            case "ESTIMATED_COST":
+                                                return 
ExplainDetail.ESTIMATED_COST;
+                                            case "CHANGELOG_MODE":
+                                                return 
ExplainDetail.CHANGELOG_MODE;
+                                            case "JSON_EXECUTION_PLAN":
+                                                return 
ExplainDetail.JSON_EXECUTION_PLAN;
+                                            default:
+                                                return null;

Review comment:
       👌

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;

Review comment:
       ok

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {

Review comment:
       ok

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);

Review comment:
       👌

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }

Review comment:
       👌

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainDetailOperation.java
##########
@@ -0,0 +1,40 @@
+package org.apache.flink.table.operations;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** explain operation. */
+public class ExplainDetailOperation implements Operation {
+    private Operation child;
+    private List<String> explainDetails;
+
+    public ExplainDetailOperation(Operation child, List<String> 
explainDetails) {
+        this.child = child;
+        this.explainDetails = explainDetails;
+    }
+
+    public Operation getChild() {
+        return child;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    @Override
+    public String asSummaryString() {
+        String detailsString =

Review comment:
       👌

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1171,6 +1174,38 @@ public TableResult executeInternal(Operation operation) {
                     .data(Collections.singletonList(Row.of(explanation)))
                     .setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                     .build();
+        } else if (operation instanceof ExplainDetailOperation) {
+            ExplainDetailOperation explainOperation = (ExplainDetailOperation) 
operation;
+            ExplainDetail[] detailSet =
+                    explainOperation.getExplainDetails().stream()
+                            .map(
+                                    detail -> {
+                                        switch (detail) {
+                                            case "ESTIMATED_COST":
+                                                return 
ExplainDetail.ESTIMATED_COST;
+                                            case "CHANGELOG_MODE":
+                                                return 
ExplainDetail.CHANGELOG_MODE;
+                                            case "JSON_EXECUTION_PLAN":
+                                                return 
ExplainDetail.JSON_EXECUTION_PLAN;
+                                            default:
+                                                return null;
+                                        }
+                                    })
+                            .filter(Objects::nonNull)
+                            .distinct()

Review comment:
       👌

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -177,6 +178,8 @@ private static SqlCommandCall parseBySqlParser(Parser 
sqlParser, String stmt) {
             cmd = SqlCommand.ALTER_FUNCTION;
         } else if (operation instanceof ExplainOperation) {
             cmd = SqlCommand.EXPLAIN;
+        } else if (operation instanceof ExplainDetailOperation) {

Review comment:
       ok ,i will remove ExplainDetailOperation and change ExplainOperation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to