danny0405 commented on a change in pull request #12188:
URL: https://github.com/apache/flink/pull/12188#discussion_r426133545
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -194,6 +283,11 @@ private SqlCommandParser() {
public final Pattern pattern;
public final Function<String[], Optional<String[]>>
operandConverter;
+ SqlCommand() {
+ this.pattern = null;
+ this.operandConverter = null;
+ }
+
SqlCommand(String matchingRegex, Function<String[],
Optional<String[]>> operandConverter) {
this.pattern = Pattern.compile(matchingRegex,
DEFAULT_PATTERN_FLAGS);
Review comment:
Mark both operands as nullable.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -207,6 +301,10 @@ public String toString() {
public boolean hasOperands() {
return operandConverter != NO_OPERANDS;
}
Review comment:
We should use equals instead of `!=` to compare the operands.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -34,29 +58,131 @@ private SqlCommandParser() {
// private
}
- public static Optional<SqlCommandCall> parse(String stmt) {
+ public static Optional<SqlCommandCall> parse(Function<String,
List<Operation>> sqlParserFunction, String stmt) {
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";")) {
stmt = stmt.substring(0, stmt.length() - 1).trim();
}
- // parse
+ // parse statement via sql parser first
+ Optional<SqlCommandCall> callOpt =
parseBySqlParser(sqlParserFunction, stmt);
+ if (callOpt.isPresent()) {
+ return callOpt;
+ }
+
+ // parse statement via regex match
for (SqlCommand cmd : SqlCommand.values()) {
- final Matcher matcher = cmd.pattern.matcher(stmt);
- if (matcher.matches()) {
- final String[] groups = new
String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
+ if (cmd.hasRegexPattern()) {
+ final Matcher matcher =
cmd.pattern.matcher(stmt);
+ if (matcher.matches()) {
+ final String[] groups = new
String[matcher.groupCount()];
+ for (int i = 0; i < groups.length; i++)
{
+ groups[i] = matcher.group(i +
1);
+ }
+ return
cmd.operandConverter.apply(groups)
+ .map((operands) -> {
+ String[]
newOperands = operands;
+ if (cmd ==
SqlCommand.EXPLAIN) {
+ //
convert `explain xx` to `explain plan for xx`
+
newOperands = new String[] { "EXPLAIN PLAN FOR " + operands[0] };
+ }
+ return new
SqlCommandCall(cmd, newOperands);
+ });
}
- return cmd.operandConverter.apply(groups)
- .map((operands) -> new
SqlCommandCall(cmd, operands));
}
}
return Optional.empty();
}
+ private static Optional<SqlCommandCall> parseBySqlParser(
+ Function<String, List<Operation>> sqlParserFunction,
String stmt) {
+ List<Operation> operations;
+ try {
+ operations = sqlParserFunction.apply(stmt);
+ } catch (SqlExecutionException e) {
+ if (e.getCause() instanceof ValidationException) {
+ // can be parsed via sql parser, but is not
validated.
+ // throw exception directly
+ throw e;
+ }
+ return Optional.empty();
+ }
+ if (operations.size() != 1) {
+ throw new SqlExecutionException("Only single statement
is supported now.");
+ }
+
+ final SqlCommand cmd;
+ String[] operands = new String[0];
+ Operation operation = operations.get(0);
+ if (operation instanceof CatalogSinkModifyOperation) {
+ boolean overwrite = ((CatalogSinkModifyOperation)
operation).isOverwrite();
+ cmd = overwrite ? SqlCommand.INSERT_OVERWRITE :
SqlCommand.INSERT_INTO;
+ operands = new String[] { stmt };
+ } else if (operation instanceof CreateTableOperation) {
+ cmd = SqlCommand.CREATE_TABLE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof DropTableOperation) {
+ cmd = SqlCommand.DROP_TABLE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof AlterTableOperation) {
+ cmd = SqlCommand.ALTER_TABLE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof CreateViewOperation) {
+ cmd = SqlCommand.CREATE_VIEW;
+ CreateViewOperation op = (CreateViewOperation)
operation;
+ operands = new String[] {
op.getViewIdentifier().asSerializableString(),
+ op.getCatalogView().getOriginalQuery()
};
+ } else if (operation instanceof DropViewOperation) {
+ cmd = SqlCommand.DROP_VIEW;
+ operands = new String[] { ((DropViewOperation)
operation).getViewIdentifier().asSerializableString() };
+ } else if (operation instanceof CreateDatabaseOperation) {
+ cmd = SqlCommand.CREATE_DATABASE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof DropDatabaseOperation) {
+ cmd = SqlCommand.DROP_DATABASE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof AlterDatabaseOperation) {
+ cmd = SqlCommand.ALTER_DATABASE;
+ operands = new String[] { stmt };
+ } else if (operation instanceof CreateCatalogOperation) {
+ cmd = SqlCommand.CREATE_CATALOG;
+ operands = new String[] { stmt };
+ } else if (operation instanceof UseCatalogOperation) {
+ cmd = SqlCommand.USE_CATALOG;
+ operands = new String[] { String.format("`%s`",
((UseCatalogOperation) operation).getCatalogName()) };
Review comment:
Move the whole if else block to a single class named
`OperationToSqlCommandConverter` ? We also need to add tests for the mapping.
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -34,29 +58,131 @@ private SqlCommandParser() {
// private
}
- public static Optional<SqlCommandCall> parse(String stmt) {
+ public static Optional<SqlCommandCall> parse(Function<String,
List<Operation>> sqlParserFunction, String stmt) {
// normalize
stmt = stmt.trim();
Review comment:
How about we just split 2 methods `parseByExecutor` and `parseByCli`,
parse a function here seems weird, because the first operand of the
`sqlParserFunction ` is just the `stmt`.
##########
File path:
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/ParserUtils.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.List;
+
+/**
+ * An utility class that provides abilities to parse sql statements.
+ */
+public class ParserUtils {
+
+ private static final TableEnvironment tableEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().build());
+
+ static {
+ tableEnv.executeSql("create table MyTable (a int, b bigint, c
varchar(32)) " +
+ "with ('connector' = 'filesystem', 'path' =
'/non')");
+ tableEnv.executeSql("create table MyOtherTable (a int, b
bigint) " +
+ "with ('connector' = 'filesystem', 'path' =
'/non')");
Review comment:
It's a `ParserUtils` but why we register table there ?
##########
File path:
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -34,29 +58,131 @@ private SqlCommandParser() {
// private
}
- public static Optional<SqlCommandCall> parse(String stmt) {
+ public static Optional<SqlCommandCall> parse(Function<String,
List<Operation>> sqlParserFunction, String stmt) {
// normalize
stmt = stmt.trim();
// remove ';' at the end
if (stmt.endsWith(";")) {
stmt = stmt.substring(0, stmt.length() - 1).trim();
}
- // parse
+ // parse statement via sql parser first
+ Optional<SqlCommandCall> callOpt =
parseBySqlParser(sqlParserFunction, stmt);
+ if (callOpt.isPresent()) {
+ return callOpt;
+ }
+
+ // parse statement via regex match
for (SqlCommand cmd : SqlCommand.values()) {
- final Matcher matcher = cmd.pattern.matcher(stmt);
- if (matcher.matches()) {
- final String[] groups = new
String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
+ if (cmd.hasRegexPattern()) {
+ final Matcher matcher =
cmd.pattern.matcher(stmt);
+ if (matcher.matches()) {
+ final String[] groups = new
String[matcher.groupCount()];
+ for (int i = 0; i < groups.length; i++)
{
+ groups[i] = matcher.group(i +
1);
+ }
+ return
cmd.operandConverter.apply(groups)
+ .map((operands) -> {
+ String[]
newOperands = operands;
+ if (cmd ==
SqlCommand.EXPLAIN) {
+ //
convert `explain xx` to `explain plan for xx`
Review comment:
It is hacky to support a non-common syntax and replace it silently, i
think support `EXPLAIN PLAN FOR` is enough.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]