danny0405 commented on a change in pull request #12188:
URL: https://github.com/apache/flink/pull/12188#discussion_r426150335



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
##########
@@ -34,29 +58,131 @@ private SqlCommandParser() {
                // private
        }
 
-       public static Optional<SqlCommandCall> parse(String stmt) {
+       public static Optional<SqlCommandCall> parse(Function<String, 
List<Operation>> sqlParserFunction, String stmt) {
                // normalize
                stmt = stmt.trim();
                // remove ';' at the end
                if (stmt.endsWith(";")) {
                        stmt = stmt.substring(0, stmt.length() - 1).trim();
                }
 
-               // parse
+               // parse statement via sql parser first
+               Optional<SqlCommandCall> callOpt = 
parseBySqlParser(sqlParserFunction, stmt);
+               if (callOpt.isPresent()) {
+                       return callOpt;
+               }
+
+               // parse statement via regex match
                for (SqlCommand cmd : SqlCommand.values()) {
-                       final Matcher matcher = cmd.pattern.matcher(stmt);
-                       if (matcher.matches()) {
-                               final String[] groups = new 
String[matcher.groupCount()];
-                               for (int i = 0; i < groups.length; i++) {
-                                       groups[i] = matcher.group(i + 1);
+                       if (cmd.hasRegexPattern()) {
+                               final Matcher matcher = 
cmd.pattern.matcher(stmt);
+                               if (matcher.matches()) {
+                                       final String[] groups = new 
String[matcher.groupCount()];
+                                       for (int i = 0; i < groups.length; i++) 
{
+                                               groups[i] = matcher.group(i + 
1);
+                                       }
+                                       return 
cmd.operandConverter.apply(groups)
+                                                       .map((operands) -> {
+                                                               String[] 
newOperands = operands;
+                                                               if (cmd == 
SqlCommand.EXPLAIN) {
+                                                                       // 
convert `explain xx` to `explain plan for xx`
+                                                                       
newOperands = new String[] { "EXPLAIN PLAN FOR " + operands[0] };
+                                                               }
+                                                               return new 
SqlCommandCall(cmd, newOperands);
+                                                       });
                                }
-                               return cmd.operandConverter.apply(groups)
-                                       .map((operands) -> new 
SqlCommandCall(cmd, operands));
                        }
                }
                return Optional.empty();
        }
 
+       private static Optional<SqlCommandCall> parseBySqlParser(
+                       Function<String, List<Operation>> sqlParserFunction, 
String stmt) {
+               List<Operation> operations;
+               try {
+                       operations = sqlParserFunction.apply(stmt);
+               } catch (SqlExecutionException e) {
+                       if (e.getCause() instanceof ValidationException) {
+                               // can be parsed via sql parser, but is not 
validated.
+                               // throw exception directly
+                               throw e;
+                       }
+                       return Optional.empty();
+               }
+               if (operations.size() != 1) {
+                       throw new SqlExecutionException("Only single statement 
is supported now.");
+               }
+
+               final SqlCommand cmd;
+               String[] operands = new String[0];
+               Operation operation = operations.get(0);
+               if (operation instanceof CatalogSinkModifyOperation) {
+                       boolean overwrite = ((CatalogSinkModifyOperation) 
operation).isOverwrite();
+                       cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : 
SqlCommand.INSERT_INTO;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof CreateTableOperation) {
+                       cmd = SqlCommand.CREATE_TABLE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof DropTableOperation) {
+                       cmd = SqlCommand.DROP_TABLE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof AlterTableOperation) {
+                       cmd = SqlCommand.ALTER_TABLE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof CreateViewOperation) {
+                       cmd = SqlCommand.CREATE_VIEW;
+                       CreateViewOperation op = (CreateViewOperation) 
operation;
+                       operands = new String[] { 
op.getViewIdentifier().asSerializableString(),
+                                       op.getCatalogView().getOriginalQuery() 
};
+               } else if (operation instanceof DropViewOperation) {
+                       cmd = SqlCommand.DROP_VIEW;
+                       operands = new String[] { ((DropViewOperation) 
operation).getViewIdentifier().asSerializableString() };
+               } else if (operation instanceof CreateDatabaseOperation) {
+                       cmd = SqlCommand.CREATE_DATABASE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof DropDatabaseOperation) {
+                       cmd = SqlCommand.DROP_DATABASE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof AlterDatabaseOperation) {
+                       cmd = SqlCommand.ALTER_DATABASE;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof CreateCatalogOperation) {
+                       cmd = SqlCommand.CREATE_CATALOG;
+                       operands = new String[] { stmt };
+               } else if (operation instanceof UseCatalogOperation) {
+                       cmd = SqlCommand.USE_CATALOG;
+                       operands = new String[] { String.format("`%s`", 
((UseCatalogOperation) operation).getCatalogName()) };

Review comment:
       Let's see if we can make some refactoring here, the code now is a mess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to