This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d836cd336f2a0df64d18065ddcb5d1141ed02a0e
Author: Rui Li <[email protected]>
AuthorDate: Thu May 21 20:50:31 2020 +0800

    [FLINK-17456][hive][test] Update hive connector tests to execute DDL & DML 
via TableEnvironment
    
    This closes #12281
---
 .../flink/connectors/hive/HiveLookupJoinTest.java  |   2 +-
 .../flink/connectors/hive/HiveTableSinkTest.java   |  12 +-
 .../flink/connectors/hive/HiveTableSourceTest.java | 132 ++++++------
 .../connectors/hive/TableEnvHiveConnectorTest.java | 232 +++++++++++----------
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    |  29 ++-
 .../flink/table/catalog/hive/HiveTestUtils.java    |  12 ++
 6 files changed, 210 insertions(+), 209 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
index ba17ada..88fae01 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java
@@ -109,7 +109,7 @@ public class HiveLookupJoinTest {
                        List<Row> results = 
Lists.newArrayList(flinkTable.execute().collect());
                        assertEquals("[1,a, 2,b, 3,c]", results.toString());
                } finally {
-                       hiveShell.execute("drop table build");
+                       tableEnv.executeSql("drop table build");
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
index d268246..2cc03de 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
@@ -202,19 +202,19 @@ public class HiveTableSinkTest {
 
        @Test
        public void testWriteNullValues() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+               tableEnv.useCatalog(hiveCatalog.getName());
+               tableEnv.executeSql("create database db1");
                try {
                        // 17 data types
-                       hiveShell.execute("create table db1.src" +
+                       tableEnv.executeSql("create table db1.src" +
                                        "(t tinyint,s smallint,i int,b bigint,f 
float,d double,de decimal(10,5),ts timestamp,dt date," +
                                        "str string,ch char(5),vch 
varchar(8),bl boolean,bin binary,arr array<int>,mp map<int,string>,strt 
struct<f1:int,f2:string>)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new Object[]{null, null, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null})
                                        .commit();
                        hiveShell.execute("create table db1.dest like db1.src");
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       tableEnv.registerCatalog(hiveCatalog.getName(), 
hiveCatalog);
-                       tableEnv.useCatalog(hiveCatalog.getName());
 
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select * from db1.src");
                        List<String> results = hiveShell.executeQuery("select * 
from db1.dest");
@@ -224,7 +224,7 @@ public class HiveTableSinkTest {
                        assertEquals("NULL", cols[0]);
                        assertEquals(1, new 
HashSet<>(Arrays.asList(cols)).size());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 6c1c483..f36a60a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -128,10 +128,10 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        @Test
        public void testReadNonPartitionedTable() throws Exception {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "test";
-               hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, 
c STRING, d BIGINT, e DOUBLE)");
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.executeSql("CREATE TABLE source_db.test ( a INT, b INT, c 
STRING, d BIGINT, e DOUBLE)");
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                .addRow(new Object[] { 1, 1, "a", 1000L, 1.11 })
                                .addRow(new Object[] { 2, 2, "b", 2000L, 2.22 })
@@ -139,8 +139,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                .addRow(new Object[] { 4, 4, "d", 4000L, 4.44 })
                                .commit();
 
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.sqlQuery("select * from hive.source_db.test");
                List<Row> rows = Lists.newArrayList(src.execute().collect());
 
@@ -153,10 +151,10 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        @Test
        public void testReadComplexDataType() throws Exception {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "complex_test";
-               hiveShell.execute("create table source_db.complex_test(" +
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.executeSql("create table source_db.complex_test(" +
                                                "a array<int>, m 
map<int,string>, s struct<f1:int,f2:bigint>)");
                Integer[] array = new Integer[]{1, 2, 3};
                Map<Integer, String> map = new LinkedHashMap<>();
@@ -166,8 +164,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                .addRow(new Object[]{array, map, struct})
                                .commit();
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.sqlQuery("select * from 
hive.source_db.complex_test");
                List<Row> rows = Lists.newArrayList(src.execute().collect());
                Assert.assertEquals(1, rows.size());
@@ -182,11 +178,11 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
         */
        @Test
        public void testReadPartitionTable() throws Exception {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "test_table_pt";
-               hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
-                                               "(year STRING, value INT) 
partitioned by (pt int);");
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.executeSql("CREATE TABLE source_db.test_table_pt " +
+                                               "(`year` STRING, `value` INT) 
partitioned by (pt int)");
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                .addRow(new Object[]{"2014", 3})
                                .addRow(new Object[]{"2014", 4})
@@ -195,8 +191,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                .addRow(new Object[]{"2015", 2})
                                .addRow(new Object[]{"2015", 5})
                                .commit("pt=1");
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.sqlQuery("select * from 
hive.source_db.test_table_pt");
                List<Row> rows = Lists.newArrayList(src.execute().collect());
 
@@ -207,11 +201,11 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        @Test
        public void testPartitionPrunning() throws Exception {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "test_table_pt_1";
-               hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
-                                               "(year STRING, value INT) 
partitioned by (pt int);");
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.executeSql("CREATE TABLE source_db.test_table_pt_1 " +
+                                               "(`year` STRING, `value` INT) 
partitioned by (pt int)");
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                .addRow(new Object[]{"2014", 3})
                                .addRow(new Object[]{"2014", 4})
@@ -220,8 +214,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                .addRow(new Object[]{"2015", 2})
                                .addRow(new Object[]{"2015", 5})
                                .commit("pt=1");
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.sqlQuery("select * from 
hive.source_db.test_table_pt_1 where pt = 0");
                // first check execution plan to ensure partition prunning works
                String[] explain = src.explain().split("==.*==\n");
@@ -241,9 +233,14 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        @Test
        public void testPartitionFilter() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               TestPartitionFilterCatalog catalog = new 
TestPartitionFilterCatalog(
+                               hiveCatalog.getName(), 
hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), 
hiveCatalog.getHiveVersion());
+               tableEnv.registerCatalog(catalog.getName(), catalog);
+               tableEnv.useCatalog(catalog.getName());
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.part(x int) 
partitioned by (p1 int,p2 string)");
+                       tableEnv.executeSql("create table db1.part(x int) 
partitioned by (p1 int,p2 string)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new 
Object[]{1}).commit("p1=1,p2='a'");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
@@ -253,11 +250,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        // test string partition columns with special characters
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new 
Object[]{4}).commit("p1=4,p2='c:2'");
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       TestPartitionFilterCatalog catalog = new 
TestPartitionFilterCatalog(
-                                       hiveCatalog.getName(), 
hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), 
hiveCatalog.getHiveVersion());
-                       tableEnv.registerCatalog(catalog.getName(), catalog);
-                       tableEnv.useCatalog(catalog.getName());
                        Table query = tableEnv.sqlQuery("select x from db1.part 
where p1>1 or p2<>'a' order by x");
                        String[] explain = query.explain().split("==.*==\n");
                        assertFalse(catalog.fallback);
@@ -298,15 +290,20 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        results = Lists.newArrayList(query.execute().collect());
                        assertEquals("[4]", results.toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testPartitionFilterDateTimestamp() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               TestPartitionFilterCatalog catalog = new 
TestPartitionFilterCatalog(
+                               hiveCatalog.getName(), 
hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), 
hiveCatalog.getHiveVersion());
+               tableEnv.registerCatalog(catalog.getName(), catalog);
+               tableEnv.useCatalog(catalog.getName());
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.part(x int) 
partitioned by (p1 date,p2 timestamp)");
+                       tableEnv.executeSql("create table db1.part(x int) 
partitioned by (p1 date,p2 timestamp)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new 
Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08'");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
@@ -314,12 +311,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new 
Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10'");
 
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       TestPartitionFilterCatalog catalog = new 
TestPartitionFilterCatalog(
-                                       hiveCatalog.getName(), 
hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), 
hiveCatalog.getHiveVersion());
-                       tableEnv.registerCatalog(catalog.getName(), catalog);
-                       tableEnv.useCatalog(catalog.getName());
-
                        Table query = tableEnv.sqlQuery(
                                        "select x from db1.part where 
p1>cast('2018-08-09' as date) and p2<>cast('2018-08-08 08:08:09' as 
timestamp)");
                        String[] explain = query.explain().split("==.*==\n");
@@ -330,14 +321,14 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        assertEquals("[3]", results.toString());
                        System.out.println(results);
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testProjectionPushDown() throws Exception {
-               hiveShell.execute("create table src(x int,y string) partitioned 
by (p1 bigint, p2 string)");
-               final String catalogName = "hive";
+               TableEnvironment tableEnv = createTableEnv();
+               tableEnv.executeSql("create table src(x int,y string) 
partitioned by (p1 bigint, p2 string)");
                try {
                        HiveTestUtils.createTextTableInserter(hiveShell, 
"default", "src")
                                        .addRow(new Object[]{1, "a"})
@@ -346,8 +337,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        HiveTestUtils.createTextTableInserter(hiveShell, 
"default", "src")
                                        .addRow(new Object[]{3, "c"})
                                        .commit("p1=2014, p2='2014'");
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       tableEnv.registerCatalog(catalogName, hiveCatalog);
                        Table table = tableEnv.sqlQuery("select p1, count(y) 
from hive.`default`.src group by p1");
                        String[] explain = table.explain().split("==.*==\n");
                        assertEquals(4, explain.length);
@@ -363,14 +352,14 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        Object[] rowStrings = 
rows.stream().map(Row::toString).sorted().toArray();
                        assertArrayEquals(new String[]{"2013,2", "2014,1"}, 
rowStrings);
                } finally {
-                       hiveShell.execute("drop table src");
+                       tableEnv.executeSql("drop table src");
                }
        }
 
        @Test
        public void testLimitPushDown() throws Exception {
-               hiveShell.execute("create table src (a string)");
-               final String catalogName = "hive";
+               TableEnvironment tableEnv = createTableEnv();
+               tableEnv.executeSql("create table src (a string)");
                try {
                        HiveTestUtils.createTextTableInserter(hiveShell, 
"default", "src")
                                                .addRow(new Object[]{"a"})
@@ -380,8 +369,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                                .commit();
                        //Add this to obtain correct stats of table to avoid 
FLINK-14965 problem
                        hiveShell.execute("analyze table src COMPUTE 
STATISTICS");
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       tableEnv.registerCatalog(catalogName, hiveCatalog);
                        Table table = tableEnv.sqlQuery("select * from 
hive.`default`.src limit 1");
                        String[] explain = table.explain().split("==.*==\n");
                        assertEquals(4, explain.length);
@@ -397,17 +384,17 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                        Object[] rowStrings = 
rows.stream().map(Row::toString).sorted().toArray();
                        assertArrayEquals(new String[]{"a"}, rowStrings);
                } finally {
-                       hiveShell.execute("drop table src");
+                       tableEnv.executeSql("drop table src");
                }
        }
 
        @Test
        public void testParallelismSetting() {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "test_parallelism";
-               hiveShell.execute("CREATE TABLE source_db.test_parallelism " +
-                               "(year STRING, value INT) partitioned by (pt 
int);");
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.executeSql("CREATE TABLE source_db.test_parallelism " +
+                               "(`year` STRING, `value` INT) partitioned by 
(pt int)");
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                .addRow(new Object[]{"2014", 3})
                                .addRow(new Object[]{"2014", 4})
@@ -416,8 +403,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                .addRow(new Object[]{"2015", 2})
                                .addRow(new Object[]{"2015", 5})
                                .commit("pt=1");
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table table = tEnv.sqlQuery("select * from 
hive.source_db.test_parallelism");
                PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
                RelNode relNode = 
planner.optimize(TableTestUtil.toRelNode(table));
@@ -429,11 +414,15 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        @Test
        public void testParallelismOnLimitPushDown() {
-               final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "test_parallelism_limit_pushdown";
-               hiveShell.execute("CREATE TABLE 
source_db.test_parallelism_limit_pushdown " +
-                                       "(year STRING, value INT) partitioned 
by (pt int);");
+               TableEnvironment tEnv = createTableEnv();
+               tEnv.getConfig().getConfiguration().setBoolean(
+                               
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+               tEnv.getConfig().getConfiguration().setInteger(
+                               
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+               tEnv.executeSql("CREATE TABLE 
source_db.test_parallelism_limit_pushdown " +
+                                       "(`year` STRING, `value` INT) 
partitioned by (pt int)");
                HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
                                        .addRow(new Object[]{"2014", 3})
                                        .addRow(new Object[]{"2014", 4})
@@ -442,12 +431,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                        .addRow(new Object[]{"2015", 2})
                                        .addRow(new Object[]{"2015", 5})
                                        .commit("pt=1");
-               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-               tEnv.getConfig().getConfiguration().setBoolean(
-                       HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
-               tEnv.getConfig().getConfiguration().setInteger(
-                       
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table table = tEnv.sqlQuery("select * from 
hive.source_db.test_parallelism_limit_pushdown limit 1");
                PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
                RelNode relNode = 
planner.optimize(TableTestUtil.toRelNode(table));
@@ -480,7 +463,11 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                final String catalogName = "hive";
                final String dbName = "source_db";
                final String tblName = "stream_test";
-               hiveShell.execute("CREATE TABLE source_db.stream_test (" +
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
+               tEnv.registerCatalog(catalogName, hiveCatalog);
+               tEnv.useCatalog(catalogName);
+               tEnv.executeSql("CREATE TABLE source_db.stream_test (" +
                                " a INT," +
                                " b STRING" +
                                ") PARTITIONED BY (ts STRING) TBLPROPERTIES (" +
@@ -493,9 +480,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                .addRow(new Object[]{0, "0"})
                                .commit("ts='2020-05-06 00:00:00'");
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.from("hive.source_db.stream_test");
 
                TestingAppendRowDataSink sink = new 
TestingAppendRowDataSink(new RowDataTypeInfo(
@@ -553,8 +537,13 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
 
        private void testNonPartitionStreamingSource(Boolean useMapredReader, 
String tblName) throws Exception {
                final String catalogName = "hive";
-               final String dbName = "source_db";
-               hiveShell.execute("CREATE TABLE source_db." + tblName + " (" +
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
+               tEnv.getConfig().getConfiguration().setBoolean(
+                               
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
+               tEnv.registerCatalog(catalogName, hiveCatalog);
+               tEnv.useCatalog(catalogName);
+               tEnv.executeSql("CREATE TABLE source_db." + tblName + " (" +
                                "  a INT," +
                                "  b CHAR(1) " +
                                ") stored as parquet TBLPROPERTIES (" +
@@ -562,12 +551,6 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                "  'streaming-source.monitor-interval'='100ms'" 
+
                                ")");
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
-               tEnv.getConfig().getConfiguration().setBoolean(
-                               
HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader);
-
-               tEnv.registerCatalog(catalogName, hiveCatalog);
                Table src = tEnv.sqlQuery("select * from hive.source_db." + 
tblName);
 
                TestingAppendSink sink = new TestingAppendSink();
@@ -672,6 +655,13 @@ public class HiveTableSourceTest extends 
BatchAbstractTestBase {
                                tEnv.executeSql("select * from 
parquet_t").collect().next());
        }
 
+       private static TableEnvironment createTableEnv() {
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               tableEnv.registerCatalog("hive", hiveCatalog);
+               tableEnv.useCatalog("hive");
+               return tableEnv;
+       }
+
        /**
         * A sub-class of HiveTableSource to test vector reader switch.
         */
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index fa31c8b..2dcd21b 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.HiveVersionTestUtil;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -90,12 +91,11 @@ public class TableEnvHiveConnectorTest {
 
        @Test
        public void testDefaultPartitionName() throws Exception {
-               hiveShell.execute("create database db1");
-               hiveShell.execute("create table db1.src (x int, y int)");
-               hiveShell.execute("create table db1.part (x int) partitioned by 
(y int)");
-               HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit();
-
                TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
+               tableEnv.executeSql("create table db1.src (x int, y int)");
+               tableEnv.executeSql("create table db1.part (x int) partitioned 
by (y int)");
+               HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit();
 
                // test generating partitions with default name
                TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into 
db1.part select * from db1.src");
@@ -110,21 +110,20 @@ public class TableEnvHiveConnectorTest {
                List<Row> rows = 
Lists.newArrayList(flinkTable.execute().collect());
                assertEquals(Arrays.toString(new String[]{"1,1", "null,2"}), 
rows.toString());
 
-               hiveShell.execute("drop database db1 cascade");
+               tableEnv.executeSql("drop database db1 cascade");
        }
 
        @Test
        public void testGetNonExistingFunction() throws Exception {
-               hiveShell.execute("create database db1");
-               hiveShell.execute("create table db1.src (d double, s string)");
-               hiveShell.execute("create table db1.dest (x bigint)");
-
                TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
+               tableEnv.executeSql("create table db1.src (d double, s 
string)");
+               tableEnv.executeSql("create table db1.dest (x bigint)");
 
                // just make sure the query runs through, no need to verify 
result
                TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into 
db1.dest select count(d) from db1.src");
 
-               hiveShell.execute("drop database db1 cascade");
+               tableEnv.executeSql("drop database db1 cascade");
        }
 
        @Test
@@ -142,7 +141,7 @@ public class TableEnvHiveConnectorTest {
        private void readWriteFormat(String format) throws Exception {
                TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
 
-               hiveShell.execute("create database db1");
+               tableEnv.executeSql("create database db1");
 
                // create source and dest tables
                String suffix;
@@ -164,9 +163,9 @@ public class TableEnvHiveConnectorTest {
                        tableSchema = "(i int,s string,ts timestamp)";
                }
 
-               hiveShell.execute(String.format(
+               tableEnv.executeSql(String.format(
                                "create table db1.src %s partitioned by (p1 
string, p2 timestamp) %s", tableSchema, suffix));
-               hiveShell.execute(String.format(
+               tableEnv.executeSql(String.format(
                                "create table db1.dest %s partitioned by (p1 
string, p2 timestamp) %s", tableSchema, suffix));
 
                // prepare source data with Hive
@@ -194,7 +193,7 @@ public class TableEnvHiveConnectorTest {
                // verify data on hive side
                verifyHiveQueryResult("select * from db1.dest", expected);
 
-               hiveShell.execute("drop database db1 cascade");
+               tableEnv.executeSql("drop database db1 cascade");
        }
 
        private String toRowValue(List<Object> row) {
@@ -209,16 +208,16 @@ public class TableEnvHiveConnectorTest {
 
        @Test
        public void testDecimal() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src1 (x 
decimal(10,2))");
-                       hiveShell.execute("create table db1.src2 (x 
decimal(10,2))");
-                       hiveShell.execute("create table db1.dest (x 
decimal(10,2))");
+                       tableEnv.executeSql("create table db1.src1 (x 
decimal(10,2))");
+                       tableEnv.executeSql("create table db1.src2 (x 
decimal(10,2))");
+                       tableEnv.executeSql("create table db1.dest (x 
decimal(10,2))");
                        // populate src1 from Hive
                        // TABLE keyword in INSERT INTO is mandatory prior to 
1.1.0
                        hiveShell.execute("insert into table db1.src1 values 
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
 
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        // populate src2 with same data from Flink
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as 
decimal(10,2))), " +
                                        "(cast(5.123 as decimal(10,2))), 
(cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))");
@@ -229,24 +228,25 @@ public class TableEnvHiveConnectorTest {
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select * from db1.src1");
                        verifyHiveQueryResult("select * from db1.dest", 
hiveShell.executeQuery("select * from db1.src1"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testInsertOverwrite() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
                        // non-partitioned
-                       hiveShell.execute("create table db1.dest (x int, y 
string)");
+                       tableEnv.executeSql("create table db1.dest (x int, y 
string)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
                        verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\ta", "2\tb"));
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert overwrite db1.dest values (3, 'c')");
                        verifyHiveQueryResult("select * from db1.dest", 
Collections.singletonList("3\tc"));
 
                        // static partition
-                       hiveShell.execute("create table db1.part(x int) 
partitioned by (y int)");
+                       tableEnv.executeSql("create table db1.part(x int) 
partitioned by (y int)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part").addRow(new Object[]{1}).commit("y=1");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part").addRow(new Object[]{2}).commit("y=2");
                        tableEnv = getTableEnvWithHiveCatalog();
@@ -259,67 +259,68 @@ public class TableEnvHiveConnectorTest {
                        // only overwrite dynamically matched partitions, other 
existing partitions remain intact
                        verifyHiveQueryResult("select * from db1.part", 
Arrays.asList("100\t1", "200\t2", "3\t3"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testStaticPartition() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x int)");
+                       tableEnv.executeSql("create table db1.src (x int)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit();
-                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest partition (p1='1''1', p2=1.1) select x from db1.src");
                        assertEquals(1, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
                        verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testDynamicPartition() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x int, y 
string, z double)");
+                       tableEnv.executeSql("create table db1.src (x int, y 
string, z double)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new Object[]{1, "a", 1.1})
                                        .addRow(new Object[]{2, "a", 2.2})
                                        .addRow(new Object[]{3, "b", 3.3})
                                        .commit();
-                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql("create table db1.dest (x int) 
partitioned by (p1 string, p2 double)");
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select * from db1.src");
                        assertEquals(3, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
                        verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testPartialDynamicPartition() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x int, y 
string)");
+                       tableEnv.executeSql("create table db1.src (x int, y 
string)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
-                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p1 double, p2 string)");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql("create table db1.dest (x int) 
partitioned by (p1 double, p2 string)");
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest partition (p1=1.1) select x,y from db1.src");
                        assertEquals(2, hiveCatalog.listPartitions(new 
ObjectPath("db1", "dest")).size());
                        verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testDateTimestampPartitionColumns() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.part(x int) 
partitioned by (dt date,ts timestamp)");
+                       tableEnv.executeSql("create table db1.part(x int) 
partitioned by (dt date,ts timestamp)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new Object[]{1})
                                        .addRow(new Object[]{2})
@@ -327,7 +328,6 @@ public class TableEnvHiveConnectorTest {
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part")
                                        .addRow(new Object[]{3})
                                        .commit("dt='2019-12-25',ts='2019-12-25 
16:23:43.012'");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part order by 
x").execute().collect());
                        assertEquals("[1,2019-12-23,2019-12-23T00:00, 
2,2019-12-23,2019-12-23T00:00, 3,2019-12-25,2019-12-25T16:23:43.012]", 
results.toString());
 
@@ -338,7 +338,7 @@ public class TableEnvHiveConnectorTest {
                        results = Lists.newArrayList(tableEnv.sqlQuery("select 
max(dt) from db1.part").execute().collect());
                        assertEquals("[2019-12-31]", results.toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
@@ -351,11 +351,12 @@ public class TableEnvHiveConnectorTest {
                // Therefore disable such tests for older Hive versions.
                String hiveVersion = HiveShimLoader.getHiveVersion();
                Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || 
hiveVersion.compareTo("1.3.0") >= 0);
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.simple (i int,a 
array<int>)");
-                       hiveShell.execute("create table db1.nested (a 
array<map<int, string>>)");
-                       hiveShell.execute("create function hiveudtf as 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+                       tableEnv.executeSql("create table db1.simple (i int,a 
array<int>)");
+                       tableEnv.executeSql("create table db1.nested (a 
array<map<int, string>>)");
+                       tableEnv.executeSql("create function hiveudtf as 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
                        hiveShell.insertInto("db1", "simple").addRow(3, 
Arrays.asList(1, 2, 3)).commit();
                        Map<Integer, String> map1 = new HashMap<>();
                        map1.put(1, "a");
@@ -364,7 +365,6 @@ public class TableEnvHiveConnectorTest {
                        map2.put(3, "c");
                        hiveShell.insertInto("db1", 
"nested").addRow(Arrays.asList(map1, map2)).commit();
 
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        List<Row> results = Lists.newArrayList(
                                        tableEnv.sqlQuery("select x from 
db1.simple, lateral table(hiveudtf(a)) as T(x)").execute().collect());
                        assertEquals("[1, 2, 3]", results.toString());
@@ -372,7 +372,7 @@ public class TableEnvHiveConnectorTest {
                                        tableEnv.sqlQuery("select x from 
db1.nested, lateral table(hiveudtf(a)) as T(x)").execute().collect());
                        assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());
 
-                       hiveShell.execute("create table db1.ts (a 
array<timestamp>)");
+                       tableEnv.executeSql("create table db1.ts (a 
array<timestamp>)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"ts").addRow(new Object[]{
                                        new 
Object[]{Timestamp.valueOf("2015-04-28 15:23:00"), 
Timestamp.valueOf("2016-06-03 17:05:52")}})
                                        .commit();
@@ -380,17 +380,18 @@ public class TableEnvHiveConnectorTest {
                                        tableEnv.sqlQuery("select x from 
db1.ts, lateral table(hiveudtf(a)) as T(x)").execute().collect());
                        assertEquals("[2015-04-28T15:23, 2016-06-03T17:05:52]", 
results.toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
-                       hiveShell.execute("drop function hiveudtf");
+                       tableEnv.executeSql("drop database db1 cascade");
+                       tableEnv.executeSql("drop function hiveudtf");
                }
        }
 
        @Test
        public void testNotNullConstraints() throws Exception {
                Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.tbl (x int,y bigint 
not null enable rely,z string not null enable norely)");
+                       tableEnv.executeSql("create table db1.tbl (x int,y 
bigint not null enable rely,z string not null enable norely)");
                        CatalogBaseTable catalogTable = 
hiveCatalog.getTable(new ObjectPath("db1", "tbl"));
                        TableSchema tableSchema = catalogTable.getSchema();
                        assertTrue("By default columns should be nullable",
@@ -400,7 +401,7 @@ public class TableEnvHiveConnectorTest {
                        assertTrue("NOT NULL NORELY columns should be 
considered nullable",
                                        
tableSchema.getFieldDataTypes()[2].getLogicalType().isNullable());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
@@ -409,10 +410,11 @@ public class TableEnvHiveConnectorTest {
                // While PK constraints are supported since Hive 2.1.0, the 
constraints cannot be RELY in 2.x versions.
                // So let's only test for 3.x.
                Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
                        // test rely PK constraints
-                       hiveShell.execute("create table db1.tbl1 (x tinyint,y 
smallint,z int, primary key (x,z) disable novalidate rely)");
+                       tableEnv.executeSql("create table db1.tbl1 (x tinyint,y 
smallint,z int, primary key (x,z) disable novalidate rely)");
                        CatalogBaseTable catalogTable = 
hiveCatalog.getTable(new ObjectPath("db1", "tbl1"));
                        TableSchema tableSchema = catalogTable.getSchema();
                        assertTrue(tableSchema.getPrimaryKey().isPresent());
@@ -421,32 +423,32 @@ public class TableEnvHiveConnectorTest {
                        
assertTrue(pk.getColumns().containsAll(Arrays.asList("x", "z")));
 
                        // test norely PK constraints
-                       hiveShell.execute("create table db1.tbl2 (x tinyint,y 
smallint, primary key (x) disable norely)");
+                       tableEnv.executeSql("create table db1.tbl2 (x tinyint,y 
smallint, primary key (x) disable norely)");
                        catalogTable = hiveCatalog.getTable(new 
ObjectPath("db1", "tbl2"));
                        tableSchema = catalogTable.getSchema();
                        assertFalse(tableSchema.getPrimaryKey().isPresent());
 
                        // test table w/o PK
-                       hiveShell.execute("create table db1.tbl3 (x tinyint)");
+                       tableEnv.executeSql("create table db1.tbl3 (x 
tinyint)");
                        catalogTable = hiveCatalog.getTable(new 
ObjectPath("db1", "tbl3"));
                        tableSchema = catalogTable.getSchema();
                        assertFalse(tableSchema.getPrimaryKey().isPresent());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testTimestamp() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (ts 
timestamp)");
-                       hiveShell.execute("create table db1.dest (ts 
timestamp)");
+                       tableEnv.executeSql("create table db1.src (ts 
timestamp)");
+                       tableEnv.executeSql("create table db1.dest (ts 
timestamp)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new 
Object[]{Timestamp.valueOf("2019-11-11 00:00:00")})
                                        .addRow(new 
Object[]{Timestamp.valueOf("2019-12-03 15:43:32.123456789")})
                                        .commit();
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        // test read timestamp from hive
                        List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.src").execute().collect());
                        assertEquals(2, results.size());
@@ -456,21 +458,21 @@ public class TableEnvHiveConnectorTest {
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select max(ts) from db1.src");
                        verifyHiveQueryResult("select * from db1.dest", 
Collections.singletonList("2019-12-03 15:43:32.123456789"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testDate() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (dt date)");
-                       hiveShell.execute("create table db1.dest (dt date)");
+                       tableEnv.executeSql("create table db1.src (dt date)");
+                       tableEnv.executeSql("create table db1.dest (dt date)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new 
Object[]{Date.valueOf("2019-12-09")})
                                        .addRow(new 
Object[]{Date.valueOf("2019-12-12")})
                                        .commit();
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        // test read date from hive
                        List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.src").execute().collect());
                        assertEquals(2, results.size());
@@ -480,15 +482,16 @@ public class TableEnvHiveConnectorTest {
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select max(dt) from db1.src");
                        verifyHiveQueryResult("select * from db1.dest", 
Collections.singletonList("2019-12-12"));
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testViews() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (key int,val 
string)");
+                       tableEnv.executeSql("create table db1.src (key int,val 
string)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new Object[]{1, "a"})
                                        .addRow(new Object[]{1, "aa"})
@@ -497,7 +500,7 @@ public class TableEnvHiveConnectorTest {
                                        .addRow(new Object[]{3, "c"})
                                        .addRow(new Object[]{3, "ccc"})
                                        .commit();
-                       hiveShell.execute("create table db1.keys (key int,name 
string)");
+                       tableEnv.executeSql("create table db1.keys (key 
int,name string)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"keys")
                                        .addRow(new Object[]{1, "key1"})
                                        .addRow(new Object[]{2, "key2"})
@@ -507,7 +510,6 @@ public class TableEnvHiveConnectorTest {
                        hiveShell.execute("create view db1.v1 as select key as 
k,val as v from db1.src limit 2");
                        hiveShell.execute("create view db1.v2 as select 
key,count(*) from db1.src group by key having count(*)>1 order by key");
                        hiveShell.execute("create view db1.v3 as select 
k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by 
k.key,k.name order by k.key");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select count(v) from 
db1.v1").execute().collect());
                        assertEquals("[2]", results.toString());
                        results = Lists.newArrayList(tableEnv.sqlQuery("select 
* from db1.v2").execute().collect());
@@ -515,16 +517,16 @@ public class TableEnvHiveConnectorTest {
                        results = Lists.newArrayList(tableEnv.sqlQuery("select 
* from db1.v3").execute().collect());
                        assertEquals("[1,key1,3, 2,key2,1, 3,key3,2]", 
results.toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testWhitespacePartValue() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p string)");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql("create table db1.dest (x int) 
partitioned by (p string)");
                        StatementSet stmtSet = tableEnv.createStatementSet();
                        stmtSet.addInsertSql("insert into db1.dest select 1,'  
'");
                        stmtSet.addInsertSql("insert into db1.dest select 2,'a 
\t'");
@@ -533,29 +535,29 @@ public class TableEnvHiveConnectorTest {
                        
tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
                        assertEquals("[p=  , p=a %09]", 
hiveShell.executeQuery("show partitions db1.dest").toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        private void testCompressTextTable(boolean batch) throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = batch ?
+                               getTableEnvWithHiveCatalog() :
+                               getStreamTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x string,y 
string)");
+                       tableEnv.executeSql("create table db1.src (x string,y 
string)");
                        hiveShell.execute("create table db1.dest like db1.src");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new Object[]{"a", "b"})
                                        .addRow(new Object[]{"c", "d"})
                                        .commit();
                        
hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
-                       TableEnvironment tableEnv = batch ?
-                                       getTableEnvWithHiveCatalog() :
-                                       getStreamTableEnvWithHiveCatalog();
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into db1.dest select * from db1.src");
                        List<String> expected = Arrays.asList("a\tb", "c\td");
                        verifyHiveQueryResult("select * from db1.dest", 
expected);
                        verifyFlinkQueryResult(tableEnv.sqlQuery("select * from 
db1.dest"), expected);
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
@@ -571,52 +573,52 @@ public class TableEnvHiveConnectorTest {
 
        @Test
        public void testRegexSerDe() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x int,y 
string) " +
+                       tableEnv.executeSql("create table db1.src (x int,y 
string) " +
                                        "row format serde 
'org.apache.hadoop.hive.serde2.RegexSerDe' " +
-                                       "with serdeproperties 
('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
+                                       "with serdeproperties 
('input.regex'='([\\d]+)\\u0001([\\S]+)')");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
                                        .addRow(new Object[]{1, "a"})
                                        .addRow(new Object[]{2, "ab"})
                                        .commit();
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        assertEquals("[1,a, 2,ab]", 
Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src order by 
x").execute().collect()).toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testUpdatePartitionSD() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.dest (x int) 
partitioned by (p string) stored as rcfile");
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql("create table db1.dest (x int) 
partitioned by (p string) stored as rcfile");
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert overwrite db1.dest partition (p='1') select 1");
-                       hiveShell.execute("alter table db1.dest set fileformat 
sequencefile");
+                       tableEnv.executeSql("alter table db1.dest set 
fileformat sequencefile");
                        TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert overwrite db1.dest partition (p='1') select 1");
                        assertEquals("[1,1]", 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.dest").execute().collect()).toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testParquetNameMapping() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.t1 (x int,y int) 
stored as parquet");
-                       hiveShell.execute("insert into table db1.t1 values 
(1,10),(2,20)");
+                       tableEnv.executeSql("create table db1.t1 (x int,y int) 
stored as parquet");
+                       TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, 
"insert into table db1.t1 values (1,10),(2,20)");
                        Table hiveTable = hiveCatalog.getHiveTable(new 
ObjectPath("db1", "t1"));
                        String location = hiveTable.getSd().getLocation();
-                       hiveShell.execute(String.format("create table db1.t2 (y 
int,x int) stored as parquet location '%s'", location));
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       tableEnv.executeSql(String.format("create table db1.t2 
(y int,x int) stored as parquet location '%s'", location));
                        
tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER,
 true);
                        assertEquals("[1, 2]", 
Lists.newArrayList(tableEnv.sqlQuery("select x from 
db1.t1").execute().collect()).toString());
                        assertEquals("[1, 2]", 
Lists.newArrayList(tableEnv.sqlQuery("select x from 
db1.t2").execute().collect()).toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
@@ -625,47 +627,47 @@ public class TableEnvHiveConnectorTest {
                // not supported until 2.1.0 -- 
https://issues.apache.org/jira/browse/HIVE-11981,
                // https://issues.apache.org/jira/browse/HIVE-13178
                Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.src (x smallint,y 
int) stored as orc");
+                       tableEnv.executeSql("create table db1.src (x smallint,y 
int) stored as orc");
                        hiveShell.execute("insert into table db1.src values 
(1,100),(2,200)");
 
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        
tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER,
 true);
 
-                       hiveShell.execute("alter table db1.src change x x int");
+                       tableEnv.executeSql("alter table db1.src change x x 
int");
                        assertEquals("[1,100, 2,200]", 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.src").execute().collect()).toString());
 
-                       hiveShell.execute("alter table db1.src change y y 
string");
+                       tableEnv.executeSql("alter table db1.src change y y 
string");
                        assertEquals("[1,100, 2,200]", 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.src").execute().collect()).toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        @Test
        public void testNonExistingPartitionFolder() throws Exception {
-               hiveShell.execute("create database db1");
+               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+               tableEnv.executeSql("create database db1");
                try {
-                       hiveShell.execute("create table db1.part (x int) 
partitioned by (p int)");
+                       tableEnv.executeSql("create table db1.part (x int) 
partitioned by (p int)");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part").addRow(new Object[]{1}).commit("p=1");
                        HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"part").addRow(new Object[]{2}).commit("p=2");
-                       hiveShell.execute("alter table db1.part add partition 
(p=3)");
+                       tableEnv.executeSql("alter table db1.part add partition 
(p=3)");
                        // remove one partition
                        Path toRemove = new Path(hiveCatalog.getHiveTable(new 
ObjectPath("db1", "part")).getSd().getLocation(), "p=2");
                        FileSystem fs = 
toRemove.getFileSystem(hiveShell.getHiveConf());
                        fs.delete(toRemove, true);
 
-                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        List<Row> results = 
Lists.newArrayList(tableEnv.sqlQuery("select * from 
db1.part").execute().collect());
                        assertEquals("[1,1]", results.toString());
                } finally {
-                       hiveShell.execute("drop database db1 cascade");
+                       tableEnv.executeSql("drop database db1 cascade");
                }
        }
 
        private TableEnvironment getTableEnvWithHiveCatalog() {
-               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
                tableEnv.useCatalog(hiveCatalog.getName());
                return tableEnv;
@@ -673,7 +675,7 @@ public class TableEnvHiveConnectorTest {
 
        private TableEnvironment getStreamTableEnvWithHiveCatalog() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
                tableEnv.useCatalog(hiveCatalog.getName());
                return tableEnv;
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index d3c6ed4..4ecf3f6 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
@@ -251,51 +252,47 @@ public class HiveCatalogUseBlinkITCase extends 
AbstractTestBase {
 
        @Test
        public void testTimestampUDF() throws Exception {
-               hiveCatalog.createFunction(new ObjectPath("default", "myyear"),
-                               new 
CatalogFunctionImpl(UDFYear.class.getCanonicalName()),
-                               false);
 
-               hiveShell.execute("create table src(ts timestamp)");
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+               tableEnv.useCatalog(hiveCatalog.getName());
+               tableEnv.executeSql(String.format("create function myyear as 
'%s'", UDFYear.class.getName()));
+               tableEnv.executeSql("create table src(ts timestamp)");
                try {
                        HiveTestUtils.createTextTableInserter(hiveShell, 
"default", "src")
                                        .addRow(new 
Object[]{Timestamp.valueOf("2013-07-15 10:00:00")})
                                        .addRow(new 
Object[]{Timestamp.valueOf("2019-05-23 17:32:55")})
                                        .commit();
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       tableEnv.registerCatalog(hiveCatalog.getName(), 
hiveCatalog);
-                       tableEnv.useCatalog(hiveCatalog.getName());
 
                        List<Row> results = Lists.newArrayList(
                                        tableEnv.sqlQuery("select myyear(ts) as 
y from src").execute().collect());
                        Assert.assertEquals(2, results.size());
                        Assert.assertEquals("[2013, 2019]", results.toString());
                } finally {
-                       hiveShell.execute("drop table src");
+                       tableEnv.executeSql("drop table src");
                }
        }
 
        @Test
        public void testDateUDF() throws Exception {
-               hiveCatalog.createFunction(new ObjectPath("default", "mymonth"),
-                               new 
CatalogFunctionImpl(UDFMonth.class.getCanonicalName()),
-                               false);
 
-               hiveShell.execute("create table src(dt date)");
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+               tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+               tableEnv.useCatalog(hiveCatalog.getName());
+               tableEnv.executeSql(String.format("create function mymonth as 
'%s'", UDFMonth.class.getName()));
+               tableEnv.executeSql("create table src(dt date)");
                try {
                        HiveTestUtils.createTextTableInserter(hiveShell, 
"default", "src")
                                        .addRow(new 
Object[]{Date.valueOf("2019-01-19")})
                                        .addRow(new 
Object[]{Date.valueOf("2019-03-02")})
                                        .commit();
-                       TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
-                       tableEnv.registerCatalog(hiveCatalog.getName(), 
hiveCatalog);
-                       tableEnv.useCatalog(hiveCatalog.getName());
 
                        List<Row> results = Lists.newArrayList(
                                        tableEnv.sqlQuery("select mymonth(dt) 
as m from src order by m").execute().collect());
                        Assert.assertEquals(2, results.size());
                        Assert.assertEquals("[1, 3]", results.toString());
                } finally {
-                       hiveShell.execute("drop table src");
+                       tableEnv.executeSql("drop table src");
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 252c6e1..57cbd8f 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -113,17 +114,28 @@ public class HiveTestUtils {
        }
 
        public static TableEnvironment 
createTableEnvWithBlinkPlannerBatchMode() {
+               return 
createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT);
+       }
+
+       public static TableEnvironment 
createTableEnvWithBlinkPlannerBatchMode(SqlDialect dialect) {
                EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
                TableEnvironment tableEnv = TableEnvironment.create(settings);
                
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(),
 1);
+               tableEnv.getConfig().setSqlDialect(dialect);
                return tableEnv;
        }
 
        public static StreamTableEnvironment 
createTableEnvWithBlinkPlannerStreamMode(
                        StreamExecutionEnvironment env) {
+               return createTableEnvWithBlinkPlannerStreamMode(env, 
SqlDialect.DEFAULT);
+       }
+
+       public static StreamTableEnvironment 
createTableEnvWithBlinkPlannerStreamMode(
+                       StreamExecutionEnvironment env, SqlDialect dialect) {
                EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env, settings);
                
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(),
 1);
+               tableEnv.getConfig().setSqlDialect(dialect);
                return tableEnv;
        }
 

Reply via email to