wuchong commented on a change in pull request #15265:
URL: https://github.com/apache/flink/pull/15265#discussion_r600144592



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ParseStrategyParser.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.parse;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** Parser that uses {@link StatementParseStrategy} to parse statement to 
{@link Operation}. */
+public class ParseStrategyParser {

Review comment:
       What do you think about renaming it to `ExtendedParser` and also move 
`CalciteParser` into the same package ? 
   
   
   ```suggestion
   /** {@link ExtendedParser} is used for parsing some special command which 
can't supported by {@link CalciteParser}, e.g. {@code SET key=value} contains 
special characters in key and value identifier. It's also good to move some 
parsring here to avoid introducing new reserved keywords.  */
   public class ParseStrategyParser {
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.ClearOperation;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.QuitOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.SourceOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Test for {@link ParserImpl}. */
+public class ParserImplTest {
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private final boolean isStreamingMode = false;
+    private final TableConfig tableConfig = new TableConfig();
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", 
"default");
+    private final CatalogManager catalogManager =
+            
CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", 
catalog).build();
+    private final ModuleManager moduleManager = new ModuleManager();
+    private final FunctionCatalog functionCatalog =
+            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
+    private final PlannerContext plannerContext =
+            new PlannerContext(
+                    tableConfig,
+                    functionCatalog,
+                    catalogManager,
+                    asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+                    new ArrayList<>());
+
+    private final Supplier<FlinkPlannerImpl> plannerSupplier =
+            () ->
+                    plannerContext.createFlinkPlanner(
+                            catalogManager.getCurrentCatalog(),
+                            catalogManager.getCurrentDatabase());
+
+    private final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    t ->
+                            plannerContext.createSqlExprToRexConverter(
+                                    
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+
+    @Test
+    public void testClearCommand() {
+        assertSimpleCommand("ClEaR", instanceOf(ClearOperation.class));
+    }
+
+    @Test
+    public void testHelpCommand() {
+        assertSimpleCommand("hELp", instanceOf(HelpOperation.class));
+    }
+
+    @Test
+    public void testQuitCommand() {
+        assertSimpleCommand("qUIt", instanceOf(QuitOperation.class));
+        assertSimpleCommand("Exit", instanceOf(QuitOperation.class));
+    }
+
+    @Test
+    public void testResetCommand() {
+        assertSimpleCommand("REsEt", instanceOf(ResetOperation.class));
+    }
+
+    @Test
+    public void testSetOperation() {
+        assertSetCommand("   SEt       ");
+        assertSetCommand("SET execution.runtime-type= batch", 
"execution.runtime-type", "batch");
+        assertSetCommand(
+                "SET pipeline.jars = /path/to/test-_-jar.jar",
+                "pipeline.jars",
+                "/path/to/test-_-jar.jar");

Review comment:
       Add a test `SET pipeline.name = ' '`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/SourceOperation.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.table.operations.Operation;
+
+/** Operation that represent SOURCE command. */
+public class SourceOperation implements Operation {

Review comment:
       Can we remove the SOURCE command? It seem there is no tests for this 
command and we have never documented it and no user used it. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ParseStrategyParser.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.parse;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** Parser that uses {@link StatementParseStrategy} to parse statement to 
{@link Operation}. */
+public class ParseStrategyParser {
+
+    public static final ParseStrategyParser INSTANCE = new 
ParseStrategyParser();
+
+    private static final List<StatementParseStrategy> REGEX_STRATEGIES =

Review comment:
       `REGEX_STRATEGIES` ==> `PARSE_STRATEGIES` 
   
   
   

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/StatementParseStrategy.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.parse;
+
+import org.apache.flink.table.operations.Operation;
+
+import java.util.regex.Pattern;
+
+/** Strategy to parse statement to {@link Operation}. */
+public abstract class StatementParseStrategy {

Review comment:
       1. What do you think about renaming to `ExtendedParseStrategy` ?
   2. The implementation already be coupled to regex, would be better to split 
it into interface and a regex-based abstract class 
`AbstractRegexParseStrategy`. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.ClearOperation;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.QuitOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.SourceOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Test for {@link ParserImpl}. */
+public class ParserImplTest {
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private final boolean isStreamingMode = false;
+    private final TableConfig tableConfig = new TableConfig();
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", 
"default");
+    private final CatalogManager catalogManager =
+            
CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", 
catalog).build();
+    private final ModuleManager moduleManager = new ModuleManager();
+    private final FunctionCatalog functionCatalog =
+            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
+    private final PlannerContext plannerContext =
+            new PlannerContext(
+                    tableConfig,
+                    functionCatalog,
+                    catalogManager,
+                    asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+                    new ArrayList<>());
+
+    private final Supplier<FlinkPlannerImpl> plannerSupplier =
+            () ->
+                    plannerContext.createFlinkPlanner(
+                            catalogManager.getCurrentCatalog(),
+                            catalogManager.getCurrentDatabase());
+
+    private final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    t ->
+                            plannerContext.createSqlExprToRexConverter(
+                                    
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+
+    @Test
+    public void testClearCommand() {
+        assertSimpleCommand("ClEaR", instanceOf(ClearOperation.class));
+    }
+
+    @Test
+    public void testHelpCommand() {
+        assertSimpleCommand("hELp", instanceOf(HelpOperation.class));
+    }
+
+    @Test
+    public void testQuitCommand() {
+        assertSimpleCommand("qUIt", instanceOf(QuitOperation.class));
+        assertSimpleCommand("Exit", instanceOf(QuitOperation.class));

Review comment:
       Would be better to refactor the tests into `Parameterized` `TestSpec`, 
see: `org.apache.flink.table.types.DataTypesTest`. The `TestSpec` can simply 
compare the parsed Operation with expected Operation using `asSummaryString`. 
   
   That would be easier to add new command parse in the future.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -70,6 +76,12 @@ public ParserImpl(
     public List<Operation> parse(String statement) {
         CalciteParser parser = calciteParserSupplier.get();
         FlinkPlannerImpl planner = validatorSupplier.get();
+
+        // use ParseStrategyParser to parse command first
+        if (ParseStrategyParser.INSTANCE.matches(statement)) {
+            return 
Collections.singletonList(ParseStrategyParser.INSTANCE.convert(statement));

Review comment:
       I think `ParseStrategyParser` doesn't need to expose two methods, 
otherwise, it will need to pattern match twice. The `ParseStrategyParser` can 
expose just one method:
   
   
   ```java
       public Optional<Operation> parse(String statement) {
           for (ExtendedParserStrategy strategy : REGEX_STRATEGIES) {
               if (strategy.match(statement)) {
                   return Optional.of(strategy.convert(statement));
               }
           }
           return Optional.empty();
       }
   ```
   
   
   
   
    

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -101,6 +113,29 @@ public ResolvedExpression parseSqlExpression(String 
sqlExpression, TableSchema i
                 sqlExpressionExpanded);
     }
 
+    public String[] getCompletionHints(String statement, int cursor) {
+        List<String> candidates =
+                new ArrayList<>(
+                        Arrays.asList(
+                                
ParseStrategyParser.INSTANCE.getCompletionHints(

Review comment:
       Could you add a final member variable `extendedParser` instead of 
referencing `ParseStrategyParser.INSTANCE` so many times?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -101,6 +113,29 @@ public ResolvedExpression parseSqlExpression(String 
sqlExpression, TableSchema i
                 sqlExpressionExpanded);
     }
 
+    public String[] getCompletionHints(String statement, int cursor) {
+        List<String> candidates =
+                new ArrayList<>(
+                        Arrays.asList(
+                                
ParseStrategyParser.INSTANCE.getCompletionHints(
+                                        statement, cursor)));
+
+        // fall back to sql advisor

Review comment:
       this is not fallback, please correct the comment.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -70,6 +76,12 @@ public ParserImpl(
     public List<Operation> parse(String statement) {
         CalciteParser parser = calciteParserSupplier.get();
         FlinkPlannerImpl planner = validatorSupplier.get();
+
+        // use ParseStrategyParser to parse command first
+        if (ParseStrategyParser.INSTANCE.matches(statement)) {

Review comment:
       move to the beginning of this method. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.ClearOperation;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.QuitOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.SourceOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Test for {@link ParserImpl}. */
+public class ParserImplTest {
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private final boolean isStreamingMode = false;
+    private final TableConfig tableConfig = new TableConfig();
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", 
"default");
+    private final CatalogManager catalogManager =
+            
CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", 
catalog).build();
+    private final ModuleManager moduleManager = new ModuleManager();
+    private final FunctionCatalog functionCatalog =
+            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
+    private final PlannerContext plannerContext =
+            new PlannerContext(
+                    tableConfig,
+                    functionCatalog,
+                    catalogManager,
+                    asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+                    new ArrayList<>());
+
+    private final Supplier<FlinkPlannerImpl> plannerSupplier =
+            () ->
+                    plannerContext.createFlinkPlanner(
+                            catalogManager.getCurrentCatalog(),
+                            catalogManager.getCurrentDatabase());
+
+    private final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    t ->
+                            plannerContext.createSqlExprToRexConverter(
+                                    
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+
+    @Test
+    public void testClearCommand() {
+        assertSimpleCommand("ClEaR", instanceOf(ClearOperation.class));
+    }
+
+    @Test
+    public void testHelpCommand() {
+        assertSimpleCommand("hELp", instanceOf(HelpOperation.class));
+    }
+
+    @Test
+    public void testQuitCommand() {
+        assertSimpleCommand("qUIt", instanceOf(QuitOperation.class));
+        assertSimpleCommand("Exit", instanceOf(QuitOperation.class));
+    }
+
+    @Test
+    public void testResetCommand() {
+        assertSimpleCommand("REsEt", instanceOf(ResetOperation.class));
+    }
+
+    @Test
+    public void testSetOperation() {
+        assertSetCommand("   SEt       ");
+        assertSetCommand("SET execution.runtime-type= batch", 
"execution.runtime-type", "batch");
+        assertSetCommand(
+                "SET pipeline.jars = /path/to/test-_-jar.jar",
+                "pipeline.jars",
+                "/path/to/test-_-jar.jar");
+
+        assertFailedSetCommand("SET execution.runtime-type=");
+    }
+
+    @Test
+    public void testGetCompletionHints() {
+        String[] hints = parser.getCompletionHints("SE", 2);
+        System.out.println(hints[0]);

Review comment:
       assert instead of print. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.ClearOperation;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.QuitOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.SourceOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Test for {@link ParserImpl}. */
+public class ParserImplTest {
+
+    @Rule public ExpectedException thrown = ExpectedException.none();
+
+    private final boolean isStreamingMode = false;
+    private final TableConfig tableConfig = new TableConfig();
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", 
"default");
+    private final CatalogManager catalogManager =
+            
CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", 
catalog).build();
+    private final ModuleManager moduleManager = new ModuleManager();
+    private final FunctionCatalog functionCatalog =
+            new FunctionCatalog(tableConfig, catalogManager, moduleManager);
+    private final PlannerContext plannerContext =
+            new PlannerContext(
+                    tableConfig,
+                    functionCatalog,
+                    catalogManager,
+                    asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+                    new ArrayList<>());
+
+    private final Supplier<FlinkPlannerImpl> plannerSupplier =
+            () ->
+                    plannerContext.createFlinkPlanner(
+                            catalogManager.getCurrentCatalog(),
+                            catalogManager.getCurrentDatabase());
+
+    private final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    t ->
+                            plannerContext.createSqlExprToRexConverter(
+                                    
plannerContext.getTypeFactory().buildRelNodeRowType(t)));
+
+    @Test
+    public void testClearCommand() {
+        assertSimpleCommand("ClEaR", instanceOf(ClearOperation.class));
+    }
+
+    @Test
+    public void testHelpCommand() {
+        assertSimpleCommand("hELp", instanceOf(HelpOperation.class));
+    }
+
+    @Test
+    public void testQuitCommand() {
+        assertSimpleCommand("qUIt", instanceOf(QuitOperation.class));
+        assertSimpleCommand("Exit", instanceOf(QuitOperation.class));

Review comment:
       Besides, please also add tests for statemenets parsed by Calcite. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to