This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 687425db [FLINK-13338][table] Make SQL dialect configurable
687425db is described below

commit 687425db49e332225bfdba631111325ca68bef53
Author: yuzhao.cyz <[email protected]>
AuthorDate: Wed Jul 24 12:27:19 2019 +0800

    [FLINK-13338][table] Make SQL dialect configurable
    
    This closes #9212.
---
 flink-python/pyflink/common/__init__.py            |   4 +-
 flink-python/pyflink/common/sql_dialect.py         |  69 +++++++++
 flink-python/pyflink/table/table_config.py         |  19 ++-
 .../org/apache/flink/table/api/SqlDialect.java     |  47 ++++++
 .../org/apache/flink/table/api/TableConfig.java    |  20 +++
 .../table/planner/delegation/PlannerContext.java   |  61 +++++---
 .../table/planner/delegation/PlannerBase.scala     |   6 +-
 .../table/sqlexec/SqlToOperationConverterTest.java | 170 +++++++++++++++++++++
 .../expressions/utils/ExpressionTestBase.scala     |   2 +-
 .../planner/match/PatternTranslatorTestBase.scala  |   2 +-
 .../batch/sql/PartitionableSinkITCase.scala        |  23 +--
 .../planner/runtime/utils/BatchTestBase.scala      |  10 +-
 .../planner/PlanningConfigurationBuilder.java      |  16 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  91 +++++++++--
 .../batch/sql/PartitionableSinkITCase.scala        |  21 +--
 15 files changed, 478 insertions(+), 83 deletions(-)

diff --git a/flink-python/pyflink/common/__init__.py 
b/flink-python/pyflink/common/__init__.py
index ca27df7..5cc9853 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -25,6 +25,7 @@ Important classes used by both Flink Streaming and Batch API:
 from pyflink.common.configuration import Configuration
 from pyflink.common.execution_config import ExecutionConfig
 from pyflink.common.execution_mode import ExecutionMode
+from pyflink.common.sql_dialect import SqlDialect
 from pyflink.common.input_dependency_constraint import 
InputDependencyConstraint
 from pyflink.common.restart_strategy import RestartStrategies, 
RestartStrategyConfiguration
 
@@ -34,5 +35,6 @@ __all__ = [
     'ExecutionMode',
     'InputDependencyConstraint',
     'RestartStrategies',
-    'RestartStrategyConfiguration'
+    'RestartStrategyConfiguration',
+    'SqlDialect'
 ]
diff --git a/flink-python/pyflink/common/sql_dialect.py 
b/flink-python/pyflink/common/sql_dialect.py
new file mode 100644
index 0000000..b78e1e5
--- /dev/null
+++ b/flink-python/pyflink/common/sql_dialect.py
@@ -0,0 +1,69 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['SqlDialect']
+
+
+class SqlDialect(object):
+    """
+    Enumeration of valid SQL compatibility modes.
+
+    In most of the cases, the built-in compatibility mode should be 
sufficient. For some features,
+    i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to 
switch to the
+    Hive dialect if required.
+
+    We may introduce other SQL dialects in the future.
+
+    :data:`DEFAULT`:
+
+    Flink's default SQL behavior.
+
+    :data:`HIVE`:
+
+    SQL dialect that allows some Apache Hive specific grammar.
+
+    Note: We might never support all of the Hive grammar. See the 
documentation for
+    supported features.
+    """
+
+    DEFAULT = 0
+    HIVE = 1
+
+    @staticmethod
+    def _from_j_sql_dialect(j_sql_dialect):
+        gateway = get_gateway()
+        JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+        if j_sql_dialect == JSqlDialect.DEFAULT:
+            return SqlDialect.DEFAULT
+        elif j_sql_dialect == JSqlDialect.HIVE:
+            return SqlDialect.HIVE
+        else:
+            raise Exception("Unsupported Java SQL dialect: %s" % j_sql_dialect)
+
+    @staticmethod
+    def _to_j_sql_dialect(sql_dialect):
+        gateway = get_gateway()
+        JSqlDialect = gateway.jvm.org.apache.flink.table.api.SqlDialect
+        if sql_dialect == SqlDialect.DEFAULT:
+            return JSqlDialect.DEFAULT
+        elif sql_dialect == SqlDialect.HIVE:
+            return JSqlDialect.HIVE
+        else:
+            raise TypeError("Unsupported SQL dialect: %s, supported SQL 
dialects are: "
+                            "SqlDialect.DEFAULT, SqlDialect.HIVE." % 
sql_dialect)
diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index 3a84fca..42026d2 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -19,7 +19,7 @@ import sys
 
 from py4j.compat import long
 
-from pyflink.common import Configuration
+from pyflink.common import Configuration, SqlDialect
 from pyflink.java_gateway import get_gateway
 
 __all__ = ['TableConfig']
@@ -247,6 +247,23 @@ class TableConfig(object):
         """
         self._j_table_config.addConfiguration(configuration._j_configuration)
 
+    def get_sql_dialect(self):
+        """
+        Returns the current SQL dialect.
+
+        :rtype: SqlDialect
+        """
+        return 
SqlDialect._from_j_sql_dialect(self._j_table_config.getSqlDialect())
+
+    def set_sql_dialect(self, sql_dialect):
+        """
+        Sets the current SQL dialect to parse a SQL query. Flink's SQL 
behavior by default.
+
+        :param sql_dialect: The given SQL dialect.
+        :type sql_dialect: SqlDialect
+        """
+        
self._j_table_config.setSqlDialect(SqlDialect._to_j_sql_dialect(sql_dialect))
+
     @staticmethod
     def get_default():
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
new file mode 100644
index 0000000..c6e5eb2
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SqlDialect.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Enumeration of valid SQL compatibility modes.
+ *
+ * <p>In most of the cases, the built-in compatibility mode should be 
sufficient. For some features,
+ * i.e. the "INSERT INTO T PARTITION(a='xxx') ..." grammar, you may need to 
switch to the Hive dialect
+ * if required.
+ *
+ * <p>We may introduce other SQL dialects in the future.
+ */
+@PublicEvolving
+public enum SqlDialect {
+
+       /**
+        * Flink's default SQL behavior.
+        */
+       DEFAULT,
+
+       /**
+        * SQL dialect that allows some Apache Hive specific grammar.
+        *
+        * <p>Note: We might never support all of the Hive grammar. See the 
documentation for supported
+        * features.
+        */
+       HIVE
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index be7a3f6..7dfd46f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -77,6 +77,12 @@ public class TableConfig {
        private Configuration configuration = new Configuration();
 
        /**
+        * The SQL dialect defines how to parse a SQL query. A different SQL 
dialect may support different
+        * SQL grammar.
+        */
+       private SqlDialect sqlDialect = SqlDialect.DEFAULT;
+
+       /**
         * Returns all key/value configuration.
         */
        public Configuration getConfiguration() {
@@ -94,6 +100,20 @@ public class TableConfig {
        }
 
        /**
+        * Returns the current SQL dialect.
+        */
+       public SqlDialect getSqlDialect() {
+               return this.sqlDialect;
+       }
+
+       /**
+        * Sets the current SQL dialect to parse a SQL query. Flink's SQL 
behavior by default.
+        */
+       public void setSqlDialect(SqlDialect sqlDialect) {
+               this.sqlDialect = sqlDialect;
+       }
+
+       /**
         * Returns the zone id for timestamp with local time zone.
         */
        public ZoneId getLocalTimeZone() {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index df085ed..df093e1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner.delegation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.planner.calcite.CalciteConfig;
 import org.apache.flink.table.planner.calcite.CalciteConfig$;
@@ -41,6 +43,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils;
 
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitDef;
@@ -72,8 +75,10 @@ public class PlannerContext {
        private final FlinkTypeFactory typeFactory = new 
FlinkTypeFactory(typeSystem);
        private final TableConfig tableConfig;
        private final FunctionCatalog functionCatalog;
-       private final FrameworkConfig frameworkConfig;
        private final RelOptCluster cluster;
+       private final Context context;
+       private final CalciteSchema rootSchema;
+       private final List<RelTraitDef> traitDefs;
 
        public PlannerContext(
                        TableConfig tableConfig,
@@ -82,7 +87,13 @@ public class PlannerContext {
                        List<RelTraitDef> traitDefs) {
                this.tableConfig = tableConfig;
                this.functionCatalog = functionCatalog;
-               this.frameworkConfig = createFrameworkConfig(rootSchema, 
traitDefs);
+               this.context = new FlinkContextImpl(tableConfig, 
functionCatalog);
+               this.rootSchema = rootSchema;
+               this.traitDefs = traitDefs;
+               // Make a framework config to initialize the RelOptCluster 
instance,
+               // caution that we can only use the attributes that can not be 
overwrite/configured
+               // by user.
+               final FrameworkConfig frameworkConfig = createFrameworkConfig();
 
                RelOptPlanner planner = new 
VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
                planner.setExecutor(frameworkConfig.getExecutor());
@@ -92,19 +103,19 @@ public class PlannerContext {
                this.cluster = FlinkRelOptClusterFactory.create(planner, new 
RexBuilder(typeFactory));
        }
 
-       private FrameworkConfig createFrameworkConfig(CalciteSchema rootSchema, 
List<RelTraitDef> traitDefs) {
+       private FrameworkConfig createFrameworkConfig() {
                return Frameworks.newConfigBuilder()
-                               .defaultSchema(rootSchema.plus())
-                               .parserConfig(getSqlParserConfig())
-                               .costFactory(new FlinkCostFactory())
-                               .typeSystem(typeSystem)
-                               
.sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
-                               
.operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), 
functionCatalog))
-                               // set the executor to evaluate constant 
expressions
-                               .executor(new ExpressionReducer(tableConfig, 
false))
-                               .context(new FlinkContextImpl(tableConfig, 
functionCatalog))
-                               .traitDefs(traitDefs)
-                               .build();
+                       .defaultSchema(rootSchema.plus())
+                       .parserConfig(getSqlParserConfig())
+                       .costFactory(new FlinkCostFactory())
+                       .typeSystem(typeSystem)
+                       
.sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
+                       
.operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), 
functionCatalog))
+                       // set the executor to evaluate constant expressions
+                       .executor(new ExpressionReducer(tableConfig, false))
+                       .context(context)
+                       .traitDefs(traitDefs)
+                       .build();
        }
 
        /** Returns the {@link FlinkTypeFactory} that will be used. */
@@ -121,7 +132,7 @@ public class PlannerContext {
         */
        public FlinkRelBuilder createRelBuilder(String currentCatalog, String 
currentDatabase) {
                FlinkCalciteCatalogReader relOptSchema = 
createCatalogReader(false, currentCatalog, currentDatabase);
-               return new FlinkRelBuilder(frameworkConfig.getContext(), 
cluster, relOptSchema);
+               return new FlinkRelBuilder(this.context, cluster, relOptSchema);
        }
 
        /**
@@ -133,7 +144,7 @@ public class PlannerContext {
         */
        public FlinkPlannerImpl createFlinkPlanner(String currentCatalog, 
String currentDatabase) {
                return new FlinkPlannerImpl(
-                               frameworkConfig,
+                               createFrameworkConfig(),
                                isLenient -> createCatalogReader(isLenient, 
currentCatalog, currentDatabase),
                                typeFactory,
                                cluster);
@@ -143,7 +154,7 @@ public class PlannerContext {
                        boolean lenientCaseSensitivity,
                        String currentCatalog,
                        String currentDatabase) {
-               SqlParser.Config sqlParserConfig = 
frameworkConfig.getParserConfig();
+               SqlParser.Config sqlParserConfig = getSqlParserConfig();
                final boolean caseSensitive;
                if (lenientCaseSensitivity) {
                        caseSensitive = false;
@@ -155,7 +166,7 @@ public class PlannerContext {
                                .setCaseSensitive(caseSensitive)
                                .build();
 
-               SchemaPlus rootSchema = 
getRootSchema(frameworkConfig.getDefaultSchema());
+               SchemaPlus rootSchema = getRootSchema(this.rootSchema.plus());
                return new FlinkCalciteCatalogReader(
                                CalciteSchema.from(rootSchema),
                                asList(
@@ -188,12 +199,24 @@ public class PlannerContext {
                                () -> SqlParser
                                                .configBuilder()
                                                
.setParserFactory(FlinkSqlParserImpl.FACTORY)
-                                               
.setConformance(FlinkSqlConformance.DEFAULT)
+                                               
.setConformance(getSqlConformance())
                                                .setLex(Lex.JAVA)
                                                .setIdentifierMaxLength(256)
                                                .build());
        }
 
+       private FlinkSqlConformance getSqlConformance() {
+               SqlDialect sqlDialect = tableConfig.getSqlDialect();
+               switch (sqlDialect) {
+                       case HIVE:
+                               return FlinkSqlConformance.HIVE;
+                       case DEFAULT:
+                               return FlinkSqlConformance.DEFAULT;
+                       default:
+                               throw new TableException("Unsupported SQL 
dialect: " + sqlDialect);
+               }
+       }
+
        /**
         * Returns the {@link SqlToRelConverter} config.
         *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 18922e8..9168d44 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -100,7 +100,7 @@ abstract class PlannerBase(
 
   /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
   @VisibleForTesting
-  private[flink] def getFlinkPlanner: FlinkPlannerImpl = {
+  private[flink] def createFlinkPlanner: FlinkPlannerImpl = {
     val currentCatalogName = catalogManager.getCurrentCatalog
     val currentDatabase = catalogManager.getCurrentDatabase
     plannerContext.createFlinkPlanner(currentCatalogName, currentDatabase)
@@ -122,7 +122,7 @@ abstract class PlannerBase(
   }
 
   override def parse(stmt: String): util.List[Operation] = {
-    val planner = getFlinkPlanner
+    val planner = createFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(stmt)
     parsed match {
@@ -159,7 +159,7 @@ abstract class PlannerBase(
   }
 
   override def getCompletionHints(statement: String, position: Int): 
Array[String] = {
-    val planner = getFlinkPlanner
+    val planner = createFlinkPlanner
     planner.getCompletionHints(statement, position)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
new file mode 100644
index 0000000..9368d00
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.operations.SqlConversionException;
+import org.apache.flink.table.planner.operations.SqlToOperationConverter;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for {@link 
org.apache.flink.table.planner.operations.SqlToOperationConverter}.
+ */
+public class SqlToOperationConverterTest {
+       private final TableConfig tableConfig = new TableConfig();
+       private final Catalog catalog = new 
GenericInMemoryCatalog("MockCatalog",
+               "default");
+       private final CatalogManager catalogManager =
+               new CatalogManager("builtin", catalog);
+       private final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager);
+       private final PlannerContext plannerContext =
+               new PlannerContext(tableConfig,
+                       functionCatalog,
+                       asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, false)),
+                       new ArrayList<>());
+
+       @Before
+       public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+               final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+               final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+               final TableSchema tableSchema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT())
+                       .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                       .field("c", DataTypes.INT())
+                       .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                       .build();
+               Map<String, String> properties = new HashMap<>();
+               properties.put("connector", "COLLECTION");
+               final CatalogTable catalogTable =  new 
CatalogTableImpl(tableSchema, properties, "");
+               catalog.createTable(path1, catalogTable, true);
+               catalog.createTable(path2, catalogTable, true);
+       }
+
+       @After
+       public void after() throws TableNotExistException {
+               final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+               final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+               catalog.dropTable(path1, true);
+               catalog.dropTable(path2, true);
+       }
+
+       @Test
+       public void testCreateTable() {
+               final String sql = "CREATE TABLE tbl1 (\n" +
+                       "  a bigint,\n" +
+                       "  b varchar, \n" +
+                       "  c int, \n" +
+                       "  d varchar" +
+                       ")\n" +
+                       "  PARTITIONED BY (a, d)\n" +
+                       "  with (\n" +
+                       "    connector = 'kafka', \n" +
+                       "    kafka.topic = 'log.test'\n" +
+                       ")\n";
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               Operation operation = parse(sql, planner);
+               assert operation instanceof CreateTableOperation;
+               CreateTableOperation op = (CreateTableOperation) operation;
+               CatalogTable catalogTable = op.getCatalogTable();
+               assertEquals(Arrays.asList("a", "d"), 
catalogTable.getPartitionKeys());
+               assertArrayEquals(catalogTable.getSchema().getFieldNames(),
+                       new String[] {"a", "b", "c", "d"});
+               assertArrayEquals(catalogTable.getSchema().getFieldDataTypes(),
+                       new DataType[]{
+                               DataTypes.BIGINT(),
+                               DataTypes.VARCHAR(Integer.MAX_VALUE),
+                               DataTypes.INT(),
+                               DataTypes.VARCHAR(Integer.MAX_VALUE)});
+       }
+
+       @Test(expected = SqlConversionException.class)
+       public void testCreateTableWithPkUniqueKeys() {
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
+               final String sql = "CREATE TABLE tbl1 (\n" +
+                       "  a bigint,\n" +
+                       "  b varchar, \n" +
+                       "  c int, \n" +
+                       "  d varchar, \n" +
+                       "  primary key(a), \n" +
+                       "  unique(a, b) \n" +
+                       ")\n" +
+                       "  PARTITIONED BY (a, d)\n" +
+                       "  with (\n" +
+                       "    connector = 'kafka', \n" +
+                       "    kafka.topic = 'log.test'\n" +
+                       ")\n";
+               parse(sql, planner);
+       }
+
+       @Test
+       public void testSqlInsertWithStaticPartition() {
+               final String sql = "insert into t1 partition(a=1) select b, c, 
d from t2";
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               Operation operation = parse(sql, planner);
+               assert operation instanceof CatalogSinkModifyOperation;
+               CatalogSinkModifyOperation sinkModifyOperation = 
(CatalogSinkModifyOperation) operation;
+               final Map<String, String> expectedStaticPartitions = new 
HashMap<>();
+               expectedStaticPartitions.put("a", "1");
+               assertEquals(expectedStaticPartitions, 
sinkModifyOperation.getStaticPartitions());
+       }
+
+       private Operation parse(String sql, FlinkPlannerImpl planner) {
+               SqlNode node = planner.parse(sql);
+               return SqlToOperationConverter.convert(planner, node);
+       }
+
+       private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+               tableConfig.setSqlDialect(sqlDialect);
+               return 
plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+                       catalogManager.getCurrentDatabase());
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index eb0d928..9b7bd4a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -64,7 +64,7 @@ abstract class ExpressionTestBase {
   private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
   private val planner = 
tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   private val relBuilder = planner.getRelBuilder
-  private val calcitePlanner = planner.getFlinkPlanner
+  private val calcitePlanner = planner.createFlinkPlanner
 
   // setup test utils
   private val tableName = "testTable"
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index 254e60f..bd5f47b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -54,7 +54,7 @@ abstract class PatternTranslatorTestBase extends TestLogger {
   private val testTableRowType = RowType.of(new IntType)
   private val tableName = "testTable"
   private val context = prepareContext(testTableTypeInfo)
-  private val calcitePlanner: FlinkPlannerImpl = context._2.getFlinkPlanner
+  private val calcitePlanner: FlinkPlannerImpl = context._2.createFlinkPlanner
 
   private def prepareContext(typeInfo: TypeInformation[Row])
   : (RelBuilder, PlannerBase, StreamExecutionEnvironment) = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 60dc55b..80b2b4e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -22,13 +22,10 @@ import 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableSchema, 
ValidationException}
-import org.apache.flink.table.planner.calcite.CalciteConfig
+import org.apache.flink.table.api.{SqlDialect, TableSchema, 
ValidationException}
 import 
org.apache.flink.table.planner.runtime.batch.sql.PartitionableSinkITCase._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -37,8 +34,6 @@ import org.apache.flink.table.sinks.{PartitionableTableSink, 
StreamTableSink, Ta
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
 
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
 import org.junit.Assert._
 import org.junit.rules.ExpectedException
 import org.junit.{Before, Rule, Test}
@@ -66,24 +61,12 @@ class PartitionableSinkITCase extends BatchTestBase {
     tEnv.getConfig
       .getConfiguration
       
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 3)
+    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
     registerCollection("nonSortTable", testData, type3, "a, b, c", 
dataNullables)
     registerCollection("sortTable", testData1, type3, "a, b, c", dataNullables)
     PartitionableSinkITCase.init()
   }
 
-  override def getTableConfig: TableConfig = {
-    val parserConfig = SqlParser.configBuilder
-      .setParserFactory(FlinkSqlParserImpl.FACTORY)
-      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
-      .setLex(Lex.JAVA)
-      .setIdentifierMaxLength(256).build
-    val plannerConfig = CalciteConfig.createBuilder(CalciteConfig.DEFAULT)
-      .replaceSqlParserConfig(parserConfig)
-    val tableConfig = new TableConfig
-    tableConfig.setPlannerConfig(plannerConfig.build())
-    tableConfig
-  }
-
   @Test
   def testInsertWithOutPartitionGrouping(): Unit = {
     registerTableSink()
@@ -181,7 +164,7 @@ class PartitionableSinkITCase extends BatchTestBase {
     expectedEx.expect(classOf[ValidationException])
     expectedEx.expectMessage("Static partition column b "
       + "should appear before dynamic partition a")
-    registerTableSink(grouping = true, partitionColumns = Array("a", "b"))
+    registerTableSink(partitionColumns = Array("a", "b"))
     tEnv.sqlUpdate("insert into sinkTable partition(b=1) select a, c from 
sortTable")
     tEnv.execute("testJob")
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 844eada..76c2171 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -56,7 +56,7 @@ class BatchTestBase extends BatchAbstractTestBase {
 
   private val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
   private val testingTableEnv: TestingTableEnvironment = 
TestingTableEnvironment
-    .create(settings, catalogManager = None, getTableConfig)
+    .create(settings, catalogManager = None, new TableConfig)
   val tEnv: TableEnvironment = testingTableEnv
   private val planner = 
tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
   val env: StreamExecutionEnvironment = planner.getExecEnv
@@ -67,14 +67,6 @@ class BatchTestBase extends BatchAbstractTestBase {
   val LINE_COL_TWICE_PATTERN: Pattern = Pattern.compile("(?s)From line 
([0-9]+),"
     + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
 
-  // TODO: [FLINK-13338] will expose dialect option to TableConfig to
-  //  avoid override CalciteConfig by users
-  /**
-    * Subclass should overwrite this method if we want to overwrite 
configuration during
-    * sql parse to sql to rel conversion phrase.
-    */
-  protected def getTableConfig: TableConfig = new TableConfig
-
   @Before
   def before(): Unit = {
     conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
DEFAULT_PARALLELISM)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
index 91e0b64..1c137d1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.planner;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
 import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.calcite.FlinkRelBuilder;
@@ -155,11 +157,23 @@ public class PlanningConfigurationBuilder {
                        SqlParser
                                .configBuilder()
                                .setParserFactory(FlinkSqlParserImpl.FACTORY)
-                               .setConformance(FlinkSqlConformance.DEFAULT)
+                               .setConformance(getSqlConformance())
                                .setLex(Lex.JAVA)
                                .build());
        }
 
+       private FlinkSqlConformance getSqlConformance() {
+               SqlDialect sqlDialect = tableConfig.getSqlDialect();
+               switch (sqlDialect) {
+                       case HIVE:
+                               return FlinkSqlConformance.HIVE;
+                       case DEFAULT:
+                               return FlinkSqlConformance.DEFAULT;
+                       default:
+                               throw new TableException("Unsupported SQL 
dialect: " + sqlDialect);
+               }
+       }
+
        private CatalogReader createCatalogReader(
                        boolean lenientCaseSensitivity,
                        String currentCatalog,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cc78be7..fe3a405 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -18,33 +18,84 @@
 
 package org.apache.flink.table.sqlexec;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogManagerCalciteSchema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpressionConverter;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.PlanningConfigurationBuilder;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.sql.SqlNode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-/** Test cases for SqlExecutableStatement. **/
+/** Test cases for {@link SqlToOperationConverter}. **/
 public class SqlToOperationConverterTest {
-       private static final ExecutionEnvironment streamExec =
-               ExecutionEnvironment.getExecutionEnvironment();
-       private static final BatchTableEnvImpl batchEnv =
-               (BatchTableEnvImpl) BatchTableEnvironment.create(streamExec);
+       private final TableConfig tableConfig = new TableConfig();
+       private final Catalog catalog = new 
GenericInMemoryCatalog("MockCatalog",
+               "default");
+       private final CatalogManager catalogManager =
+               new CatalogManager("builtin", catalog);
+       private final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager);
+       private final PlanningConfigurationBuilder planningConfigurationBuilder 
=
+               new PlanningConfigurationBuilder(tableConfig,
+                       functionCatalog,
+                       asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, false)),
+                       new ExpressionBridge<>(functionCatalog,
+                               PlannerExpressionConverter.INSTANCE()));
 
-       private static final FlinkPlannerImpl planner = 
batchEnv.getFlinkPlanner();
+       @Before
+       public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+               final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+               final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+               final TableSchema tableSchema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT())
+                       .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                       .field("c", DataTypes.INT())
+                       .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                       .build();
+               Map<String, String> properties = new HashMap<>();
+               properties.put("connector", "COLLECTION");
+               final CatalogTable catalogTable =  new 
CatalogTableImpl(tableSchema, properties, "");
+               catalog.createTable(path1, catalogTable, true);
+               catalog.createTable(path2, catalogTable, true);
+       }
+
+       @After
+       public void after() throws TableNotExistException {
+               final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+               final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+               catalog.dropTable(path1, true);
+               catalog.dropTable(path2, true);
+       }
 
        @Test
        public void testCreateTable() {
@@ -59,6 +110,7 @@ public class SqlToOperationConverterTest {
                        "    connector = 'kafka', \n" +
                        "    kafka.topic = 'log.test'\n" +
                        ")\n";
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
                SqlNode node = planner.parse(sql);
                assert node instanceof SqlCreateTable;
                Operation operation = SqlToOperationConverter.convert(planner, 
node);
@@ -91,8 +143,29 @@ public class SqlToOperationConverterTest {
                        "    connector = 'kafka', \n" +
                        "    kafka.topic = 'log.test'\n" +
                        ")\n";
+               final FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.DEFAULT);
                SqlNode node = planner.parse(sql);
                assert node instanceof SqlCreateTable;
                SqlToOperationConverter.convert(planner, node);
        }
+
+       @Test
+       public void testSqlInsertWithStaticPartition() {
+               final String sql = "insert into t1 partition(a=1) select b, c, 
d from t2";
+               FlinkPlannerImpl planner = 
getPlannerBySqlDialect(SqlDialect.HIVE);
+               SqlNode node = planner.parse(sql);
+               assert node instanceof RichSqlInsert;
+               Operation operation = SqlToOperationConverter.convert(planner, 
node);
+               assert operation instanceof CatalogSinkModifyOperation;
+               CatalogSinkModifyOperation sinkModifyOperation = 
(CatalogSinkModifyOperation) operation;
+               final Map<String, String> expectedStaticPartitions = new 
HashMap<>();
+               expectedStaticPartitions.put("a", "1");
+               assertEquals(expectedStaticPartitions, 
sinkModifyOperation.getStaticPartitions());
+       }
+
+       private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+               tableConfig.setSqlDialect(sqlDialect);
+               return 
planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
+                       catalogManager.getCurrentDatabase());
+       }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
index d021d3a..cb17bab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -27,19 +27,15 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl
-import org.apache.flink.sql.parser.validate.FlinkSqlConformance
 import org.apache.flink.table.api.scala.BatchTableEnvironment
-import org.apache.flink.table.api.{DataTypes, PlannerConfig, TableSchema, 
ValidationException}
-import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.api.{DataTypes, SqlDialect, TableSchema, 
ValidationException}
 import 
org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
 import 
org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase.{RESULT1, 
RESULT2, RESULT3, _}
 import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, 
TableSink}
 import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 import org.apache.flink.types.Row
-import org.apache.calcite.config.Lex
-import org.apache.calcite.sql.parser.SqlParser
+
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{Before, Rule, Test}
@@ -65,7 +61,7 @@ class PartitionableSinkITCase {
   def before(): Unit = {
     batchExec.setParallelism(3)
     tEnv = BatchTableEnvironment.create(batchExec)
-    tEnv.getConfig.setPlannerConfig(getPlannerConfig)
+    tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
     registerTableSource("nonSortTable", testData.toList)
     registerTableSource("sortTable", testData1.toList)
     PartitionableSinkITCase.init()
@@ -80,17 +76,6 @@ class PartitionableSinkITCase {
     tEnv.registerTableSource(name, new CollectionTableSource(data, 100, 
tableSchema))
   }
 
-  private def getPlannerConfig: PlannerConfig = {
-    val parserConfig = SqlParser.configBuilder
-      .setParserFactory(FlinkSqlParserImpl.FACTORY)
-      .setConformance(FlinkSqlConformance.HIVE) // set up hive dialect
-      .setLex(Lex.JAVA)
-      .setIdentifierMaxLength(256).build
-    CalciteConfig.createBuilder()
-      .replaceSqlParserConfig(parserConfig)
-      .build()
-  }
-
   @Test
   def testInsertWithOutPartitionGrouping(): Unit = {
     registerTableSink(grouping = false)

Reply via email to