fsk119 commented on a change in pull request #15317:
URL: https://github.com/apache/flink/pull/15317#discussion_r598635002



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -286,15 +274,10 @@ private static SqlCommandCall parseBySqlParser(Parser 
sqlParser, String stmt) {
 
         DESCRIBE,
 
-        // supports both `explain xx` and `explain plan for xx` now
-        // TODO should keep `explain xx` ?
+        // supports both `EXPLAIN JSON_EXECUTION_PLAN, CHANGELOG_MODE, 
ESTIMATED_COST` parsed via

Review comment:
       remove this comment 

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -449,6 +454,9 @@
     "PARTITIONED"
     "PARTITIONS"
     "VIRTUAL"
+    "ESTIMATED_COST"
+    "CHANGELOG_MODE"
+    "JSON_EXECUTION_PLAN"

Review comment:
       Why add these?

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }
+        statement.unparse(writer, leftPrec, rightPrec);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        List<String> repeatDetails =

Review comment:
       Rename to `duplicateDetails`

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -116,6 +118,9 @@
     "WATERMARK"
     "WATERMARKS"
     "TIMESTAMP_LTZ"
+    "ESTIMATED_COST"
+    "CHANGELOG_MODE"
+    "JSON_EXECUTION_PLAN"

Review comment:
       Please keep the import classes in alphabetical order if new class is 
added.
   
   

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainOperation.java
##########
@@ -24,6 +24,7 @@
  * Operation to describe an EXPLAIN statement. NOTES: currently, only default 
behavior(EXPLAIN PLAN
  * FOR xx) is supported.
  */
+@Deprecated

Review comment:
       Also add `@deprecated use {@link ExplainDetailsOperation} instead` in 
the java doc

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -294,6 +294,27 @@ class TableEnvironmentITCase(tableEnvName: String) {
     assertFirstValues(sinkPath)
   }
 
+  @Test
+  def testExecuteExplainSelect(): Unit = {

Review comment:
       Please refer to `TableEnvironmentITCase#testSqlUpdateAndToDataStream`. 
Godfrey has already offer a better way to check the result.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##########
@@ -818,6 +819,22 @@ abstract class TableEnvImpl(
           .data(JCollections.singletonList(Row.of(explanation)))
           .setPrintStyle(PrintStyle.rawContent())
           .build
+      case explainOperation: ExplainDetailOperation =>
+        val details: mutable.Buffer[ExplainDetail] = 
explainOperation.getExplainDetails.map {
+          case "ESTIMATED_COST" => ExplainDetail.ESTIMATED_COST
+          case "CHANGELOG_MODE" => ExplainDetail.CHANGELOG_MODE
+          case "JSON_EXECUTION_PLAN" => ExplainDetail.JSON_EXECUTION_PLAN
+          case _ => null

Review comment:
       throw exception.

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;

Review comment:
       Add final

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {

Review comment:
       Why add set method?
   
   According to `Principle of Minimal Exposure`, we should only add the needed 
methods.

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);

Review comment:
       The statement must not nullable? Use `Collections.singletonlist` instead?

##########
File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -1451,6 +1451,30 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
     }
 }
 
+/**
+* Parses a expain module statement.
+* EXPLAIN [ExplainDetail[, ExplainDetail]*] statement;
+*/
+SqlNode SqlExplainDetails() :
+{
+    SqlNode stmt;
+    List<String> explainDetails = new ArrayList<String>();
+}
+{
+    <EXPLAIN>
+    [
+        (<ESTIMATED_COST> | <CHANGELOG_MODE> | <JSON_EXECUTION_PLAN>) {
+           explainDetails.add(token.image.toUpperCase());
+        }
+        (<COMMA> (<ESTIMATED_COST> | <CHANGELOG_MODE> | <JSON_EXECUTION_PLAN>) 
{
+           explainDetails.add(token.image.toUpperCase());
+        })*
+    ]
+    stmt = SqlQueryOrDml() {
+        return new SqlExplainDetails(getPos(),stmt,explainDetails);

Review comment:
       Add whitespace after the comma.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -55,12 +55,56 @@ public void setup() {
         tableEnv = helper.getTableEnv();
     }
 
+    @Test
+    public void testExplainDetails() {
+        List<TestItem> testItems =
+                Arrays.asList(
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN INSERT INTO 
MySink(c) SELECT c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN INSERT INTO 
MySink(c) SELECT c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN INSERT INTO MySink(c) SELECT c FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN INSERT INTO MySink(c) SELECT c FROM 
MyTable"),
+                        TestItem.invalidSql(
+                                "EXPLAIN INSERT INTO MySink(c) SELECT xxx FROM 
MyTable",
+                                SqlExecutionException.class,
+                                "Column 'xxx' not found in any table"),
+                        TestItem.invalidSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,JSON_EXECUTION_PLAN INSERT INTO MySink(c) SELECT xxx FROM 
MyTable",
+                                SqlExecutionException.class,
+                                "Duplicate EXPLAIN KEY: JSON_EXECUTION_PLAN"));
+
+        for (TestItem item : testItems) {
+            tableEnv.getConfig().setSqlDialect(item.sqlDialect);
+            runTestItem(item);
+        }
+    }
+
     @Test
     public void testCommands() throws Exception {
         List<TestItem> testItems =
                 Arrays.asList(
                         TestItem.validSql("QUIT;", 
SqlCommand.QUIT).cannotParseComment(),
-                        TestItem.validSql("eXiT;", 
SqlCommand.QUIT).cannotParseComment(),
+                        // TODO: 2021/3/18 修改变量
+                        TestItem.validSql("EXIT;", 
SqlCommand.QUIT).cannotParseComment(),

Review comment:
       Why modify here?

##########
File path: flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
##########
@@ -82,6 +82,8 @@
     "org.apache.calcite.sql.SqlDrop"
     "java.util.List"
     "java.util.ArrayList"
+    "org.apache.calcite.sql.SqlUtil"
+    "org.apache.flink.sql.parser.dql.SqlExplainDetails"

Review comment:
       Please keep the import classes in alphabetical order if new class is 
added.
   

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
##########
@@ -105,19 +149,31 @@ public void testCommands() throws Exception {
                                 "EXPLAIN ", // no query
                                 SqlExecutionException.class,
                                 "Encountered \"<EOF>\""),
-                        // explain plan for xx
+                        // explain xx
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST SELECT * FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN 
JSON_EXECUTION_PLAN,CHANGELOG_MODE,ESTIMATED_COST INSERT INTO MySink(c) SELECT 
c FROM MyTable"),
+                        TestItem.validSql(
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable",
+                                SqlCommand.EXPLAIN,
+                                "EXPLAIN JSON_EXECUTION_PLAN SELECT * FROM 
MyTable"),
                         TestItem.validSql(
-                                "EXPLAIN PLAN FOR SELECT a FROM MyTable",

Review comment:
       Please don't remove the origin test. Just add the new test directly.

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }

Review comment:
       Move the defination of the string into the if-block.
   
   We should  `minimize the scope of local variables`.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1171,6 +1174,38 @@ public TableResult executeInternal(Operation operation) {
                     .data(Collections.singletonList(Row.of(explanation)))
                     .setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                     .build();
+        } else if (operation instanceof ExplainDetailOperation) {
+            ExplainDetailOperation explainOperation = (ExplainDetailOperation) 
operation;
+            ExplainDetail[] detailSet =
+                    explainOperation.getExplainDetails().stream()
+                            .map(
+                                    detail -> {
+                                        switch (detail) {
+                                            case "ESTIMATED_COST":
+                                                return 
ExplainDetail.ESTIMATED_COST;
+                                            case "CHANGELOG_MODE":
+                                                return 
ExplainDetail.CHANGELOG_MODE;
+                                            case "JSON_EXECUTION_PLAN":
+                                                return 
ExplainDetail.JSON_EXECUTION_PLAN;
+                                            default:
+                                                return null;
+                                        }
+                                    })
+                            .filter(Objects::nonNull)
+                            .distinct()

Review comment:
       Why need `distinct`? We have the promise in the validation phase that 
the details is distinct.

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(statement);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("EXPLAIN");
+        String explainDetailString = String.join(", ", explainDetails);
+        if (!explainDetails.isEmpty()) {
+            writer.keyword(explainDetailString);
+        }
+        statement.unparse(writer, leftPrec, rightPrec);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        List<String> repeatDetails =
+                explainDetails.stream()
+                        .collect(Collectors.toMap(e -> e, e -> 1, 
Integer::sum))
+                        .entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() > 1)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+
+        if (repeatDetails.size() > 0) {
+            throw new SqlValidateException(
+                    this.getParserPosition(),
+                    String.format("Duplicate EXPLAIN KEY: %s", String.join(", 
", repeatDetails)));

Review comment:
       "Duplicate EXPLAIN DETAILS: "?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1171,6 +1174,38 @@ public TableResult executeInternal(Operation operation) {
                     .data(Collections.singletonList(Row.of(explanation)))
                     .setPrintStyle(TableResultImpl.PrintStyle.rawContent())
                     .build();
+        } else if (operation instanceof ExplainDetailOperation) {
+            ExplainDetailOperation explainOperation = (ExplainDetailOperation) 
operation;
+            ExplainDetail[] detailSet =
+                    explainOperation.getExplainDetails().stream()
+                            .map(
+                                    detail -> {
+                                        switch (detail) {
+                                            case "ESTIMATED_COST":
+                                                return 
ExplainDetail.ESTIMATED_COST;
+                                            case "CHANGELOG_MODE":
+                                                return 
ExplainDetail.CHANGELOG_MODE;
+                                            case "JSON_EXECUTION_PLAN":
+                                                return 
ExplainDetail.JSON_EXECUTION_PLAN;
+                                            default:
+                                                return null;

Review comment:
       throw TableExeception if meets unrecoginized details.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainDetailOperation.java
##########
@@ -0,0 +1,40 @@
+package org.apache.flink.table.operations;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** explain operation. */
+public class ExplainDetailOperation implements Operation {
+    private Operation child;
+    private List<String> explainDetails;
+
+    public ExplainDetailOperation(Operation child, List<String> 
explainDetails) {
+        this.child = child;
+        this.explainDetails = explainDetails;
+    }
+
+    public Operation getChild() {
+        return child;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    @Override
+    public String asSummaryString() {
+        String detailsString =

Review comment:
       Why use `detailString` as the variable name? 
   
   The name of the variable should represent its conecpt rather than 
implementaion. Because implementation may change but the concept is stable.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1162,6 +1164,7 @@ public TableResult executeInternal(Operation operation) {
                 throw new TableException(exMsg, e);
             }
         } else if (operation instanceof ExplainOperation) {
+            // In fact, never get here

Review comment:
       It is reachable if the input is instance of `ExplainOperation`. 
   
   Remove this comment

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlExplainDetails.java
##########
@@ -0,0 +1,87 @@
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** EXPLAIN [ExplainDetail [,ExplainDetail]*]* STATEMENT sql call. */
+public class SqlExplainDetails extends SqlCall implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);
+
+    private SqlNode statement;
+    private List<String> explainDetails;
+
+    public SqlExplainDetails(SqlParserPos pos, SqlNode statement, List<String> 
explainDetails) {
+        super(pos);
+        this.statement = statement;
+        this.explainDetails = explainDetails;
+    }
+
+    public SqlNode getStatement() {
+        return statement;
+    }
+
+    public void setStatement(SqlNode statement) {
+        this.statement = statement;
+    }
+
+    public List<String> getExplainDetails() {
+        return explainDetails;
+    }
+
+    public void setExplainDetails(List<String> explainDetails) {
+        this.explainDetails = explainDetails;
+    }

Review comment:
       Why add this?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
##########
@@ -142,10 +142,14 @@ class FlinkPlannerImpl(
         return sqlNode
       }
       sqlNode match {
+        // this explain should be deprecated
         case explain: SqlExplain =>
           val validated = validator.validate(explain.getExplicandum)
           explain.setOperand(0, validated)
           explain
+        case explainDetails: SqlExplainDetails =>
+          validator.validate(explainDetails.getStatement)

Review comment:
       Why we don't need setOperand?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##########
@@ -112,9 +113,9 @@ abstract class TableEnvImpl(
     tableLookup,
     new SqlExpressionResolver {
       override def resolveExpression(sqlExpression: String, inputSchema: 
TableSchema)
-        : ResolvedExpression = {
-            throw new UnsupportedOperationException(
-              "SQL expression parsing is only supported in the Blink planner.")
+      : ResolvedExpression = {
+        throw new UnsupportedOperationException(
+          "SQL expression parsing is only supported in the Blink planner.")

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExplainDetailOperation.java
##########
@@ -0,0 +1,40 @@
+package org.apache.flink.table.operations;
+
+import org.apache.flink.util.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** explain operation. */

Review comment:
       We have `ExplainOperation`. Please use meaningful description.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
##########
@@ -142,10 +142,14 @@ class FlinkPlannerImpl(
         return sqlNode
       }
       sqlNode match {
+        // this explain should be deprecated

Review comment:
       Explain will not be deprecated... It is the `ExplainOperation` to be 
deprecated.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##########
@@ -50,26 +50,27 @@ import org.apache.flink.types.Row
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
-
 import _root_.java.lang.{Iterable => JIterable, Long => JLong}
 import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.java.util.{Optional, Collections => JCollections, HashMap => 
JHashMap, List => JList, Map => JMap}
 import java.util
 
+
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.util.Try
+import scala.collection.mutable
 
 /**
-  * The abstract base class for the implementation of batch TableEnvironment.
-  *
-  * @param config The configuration of the TableEnvironment
-  */
+ * The abstract base class for the implementation of batch TableEnvironment.
+ *
+ * @param config The configuration of the TableEnvironment
+ */
 abstract class TableEnvImpl(
-    val config: TableConfig,
-    private val catalogManager: CatalogManager,
-    private val moduleManager: ModuleManager,
-    private val userClassLoader: ClassLoader)
+                             val config: TableConfig,
+                             private val catalogManager: CatalogManager,
+                             private val moduleManager: ModuleManager,
+                             private val userClassLoader: ClassLoader)

Review comment:
       rollback

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -177,6 +178,8 @@ private static SqlCommandCall parseBySqlParser(Parser 
sqlParser, String stmt) {
             cmd = SqlCommand.ALTER_FUNCTION;
         } else if (operation instanceof ExplainOperation) {
             cmd = SqlCommand.EXPLAIN;
+        } else if (operation instanceof ExplainDetailOperation) {

Review comment:
       Dont introduce new SqlCommand. I think `SqlCommand.EXPLAIN` is enough.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
##########
@@ -529,10 +550,10 @@ class TableEnvironmentITCase(tableEnvName: String) {
   }
 
   private def registerCsvTableSink(
-      tEnv: TableEnvironment,
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]],
-      tableName: String): String = {
+                                    tEnv: TableEnvironment,
+                                    fieldNames: Array[String],
+                                    fieldTypes: Array[TypeInformation[_]],
+                                    tableName: String): String = {

Review comment:
       Roll back




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to