This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d836cd336f2a0df64d18065ddcb5d1141ed02a0e Author: Rui Li <[email protected]> AuthorDate: Thu May 21 20:50:31 2020 +0800 [FLINK-17456][hive][test] Update hive connector tests to execute DDL & DML via TableEnvironment This closes #12281 --- .../flink/connectors/hive/HiveLookupJoinTest.java | 2 +- .../flink/connectors/hive/HiveTableSinkTest.java | 12 +- .../flink/connectors/hive/HiveTableSourceTest.java | 132 ++++++------ .../connectors/hive/TableEnvHiveConnectorTest.java | 232 +++++++++++---------- .../catalog/hive/HiveCatalogUseBlinkITCase.java | 29 ++- .../flink/table/catalog/hive/HiveTestUtils.java | 12 ++ 6 files changed, 210 insertions(+), 209 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java index ba17ada..88fae01 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinTest.java @@ -109,7 +109,7 @@ public class HiveLookupJoinTest { List<Row> results = Lists.newArrayList(flinkTable.execute().collect()); assertEquals("[1,a, 2,b, 3,c]", results.toString()); } finally { - hiveShell.execute("drop table build"); + tableEnv.executeSql("drop table build"); } } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java index d268246..2cc03de 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java @@ -202,19 +202,19 @@ public class HiveTableSinkTest { @Test public void testWriteNullValues() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + tableEnv.executeSql("create database db1"); try { // 17 data types - hiveShell.execute("create table db1.src" + + tableEnv.executeSql("create table db1.src" + "(t tinyint,s smallint,i int,b bigint,f float,d double,de decimal(10,5),ts timestamp,dt date," + "str string,ch char(5),vch varchar(8),bl boolean,bin binary,arr array<int>,mp map<int,string>,strt struct<f1:int,f2:string>)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null}) .commit(); hiveShell.execute("create table db1.dest like db1.src"); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); - tableEnv.useCatalog(hiveCatalog.getName()); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src"); List<String> results = hiveShell.executeQuery("select * from db1.dest"); @@ -224,7 +224,7 @@ public class HiveTableSinkTest { assertEquals("NULL", cols[0]); assertEquals(1, new HashSet<>(Arrays.asList(cols)).size()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java index 6c1c483..f36a60a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java @@ -128,10 +128,10 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { @Test public void testReadNonPartitionedTable() throws Exception { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "test"; - hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)"); + TableEnvironment tEnv = createTableEnv(); + tEnv.executeSql("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)"); HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[] { 1, 1, "a", 1000L, 1.11 }) .addRow(new Object[] { 2, 2, "b", 2000L, 2.22 }) @@ -139,8 +139,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[] { 4, 4, "d", 4000L, 4.44 }) .commit(); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.sqlQuery("select * from hive.source_db.test"); List<Row> rows = Lists.newArrayList(src.execute().collect()); @@ -153,10 +151,10 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { @Test public void testReadComplexDataType() throws Exception { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "complex_test"; - hiveShell.execute("create table source_db.complex_test(" + + TableEnvironment tEnv = createTableEnv(); + tEnv.executeSql("create table source_db.complex_test(" + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)"); Integer[] array = new Integer[]{1, 2, 3}; Map<Integer, String> map = new LinkedHashMap<>(); @@ -166,8 +164,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{array, map, struct}) .commit(); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test"); List<Row> rows = Lists.newArrayList(src.execute().collect()); Assert.assertEquals(1, rows.size()); @@ -182,11 +178,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { */ @Test public void testReadPartitionTable() throws Exception { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "test_table_pt"; - hiveShell.execute("CREATE TABLE source_db.test_table_pt " + - "(year STRING, value INT) partitioned by (pt int);"); + TableEnvironment tEnv = createTableEnv(); + tEnv.executeSql("CREATE TABLE source_db.test_table_pt " + + "(`year` STRING, `value` INT) partitioned by (pt int)"); HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{"2014", 3}) .addRow(new Object[]{"2014", 4}) @@ -195,8 +191,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[]{"2015", 2}) .addRow(new Object[]{"2015", 5}) .commit("pt=1"); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt"); List<Row> rows = Lists.newArrayList(src.execute().collect()); @@ -207,11 +201,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { @Test public void testPartitionPrunning() throws Exception { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "test_table_pt_1"; - hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " + - "(year STRING, value INT) partitioned by (pt int);"); + TableEnvironment tEnv = createTableEnv(); + tEnv.executeSql("CREATE TABLE source_db.test_table_pt_1 " + + "(`year` STRING, `value` INT) partitioned by (pt int)"); HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{"2014", 3}) .addRow(new Object[]{"2014", 4}) @@ -220,8 +214,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[]{"2015", 2}) .addRow(new Object[]{"2015", 5}) .commit("pt=1"); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0"); // first check execution plan to ensure partition prunning works String[] explain = src.explain().split("==.*==\n"); @@ -241,9 +233,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { @Test public void testPartitionFilter() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog( + hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion()); + tableEnv.registerCatalog(catalog.getName(), catalog); + tableEnv.useCatalog(catalog.getName()); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.part(x int) partitioned by (p1 int,p2 string)"); + tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 int,p2 string)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{1}).commit("p1=1,p2='a'"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") @@ -253,11 +250,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { // test string partition columns with special characters HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{4}).commit("p1=4,p2='c:2'"); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog( - hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion()); - tableEnv.registerCatalog(catalog.getName(), catalog); - tableEnv.useCatalog(catalog.getName()); Table query = tableEnv.sqlQuery("select x from db1.part where p1>1 or p2<>'a' order by x"); String[] explain = query.explain().split("==.*==\n"); assertFalse(catalog.fallback); @@ -298,15 +290,20 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { results = Lists.newArrayList(query.execute().collect()); assertEquals("[4]", results.toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testPartitionFilterDateTimestamp() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog( + hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion()); + tableEnv.registerCatalog(catalog.getName(), catalog); + tableEnv.useCatalog(catalog.getName()); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)"); + tableEnv.executeSql("create table db1.part(x int) partitioned by (p1 date,p2 timestamp)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{1}).commit("p1='2018-08-08',p2='2018-08-08 08:08:08'"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") @@ -314,12 +311,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{3}).commit("p1='2018-08-10',p2='2018-08-08 08:08:10'"); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - TestPartitionFilterCatalog catalog = new TestPartitionFilterCatalog( - hiveCatalog.getName(), hiveCatalog.getDefaultDatabase(), hiveCatalog.getHiveConf(), hiveCatalog.getHiveVersion()); - tableEnv.registerCatalog(catalog.getName(), catalog); - tableEnv.useCatalog(catalog.getName()); - Table query = tableEnv.sqlQuery( "select x from db1.part where p1>cast('2018-08-09' as date) and p2<>cast('2018-08-08 08:08:09' as timestamp)"); String[] explain = query.explain().split("==.*==\n"); @@ -330,14 +321,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { assertEquals("[3]", results.toString()); System.out.println(results); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testProjectionPushDown() throws Exception { - hiveShell.execute("create table src(x int,y string) partitioned by (p1 bigint, p2 string)"); - final String catalogName = "hive"; + TableEnvironment tableEnv = createTableEnv(); + tableEnv.executeSql("create table src(x int,y string) partitioned by (p1 bigint, p2 string)"); try { HiveTestUtils.createTextTableInserter(hiveShell, "default", "src") .addRow(new Object[]{1, "a"}) @@ -346,8 +337,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { HiveTestUtils.createTextTableInserter(hiveShell, "default", "src") .addRow(new Object[]{3, "c"}) .commit("p1=2014, p2='2014'"); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tableEnv.registerCatalog(catalogName, hiveCatalog); Table table = tableEnv.sqlQuery("select p1, count(y) from hive.`default`.src group by p1"); String[] explain = table.explain().split("==.*==\n"); assertEquals(4, explain.length); @@ -363,14 +352,14 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray(); assertArrayEquals(new String[]{"2013,2", "2014,1"}, rowStrings); } finally { - hiveShell.execute("drop table src"); + tableEnv.executeSql("drop table src"); } } @Test public void testLimitPushDown() throws Exception { - hiveShell.execute("create table src (a string)"); - final String catalogName = "hive"; + TableEnvironment tableEnv = createTableEnv(); + tableEnv.executeSql("create table src (a string)"); try { HiveTestUtils.createTextTableInserter(hiveShell, "default", "src") .addRow(new Object[]{"a"}) @@ -380,8 +369,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .commit(); //Add this to obtain correct stats of table to avoid FLINK-14965 problem hiveShell.execute("analyze table src COMPUTE STATISTICS"); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tableEnv.registerCatalog(catalogName, hiveCatalog); Table table = tableEnv.sqlQuery("select * from hive.`default`.src limit 1"); String[] explain = table.explain().split("==.*==\n"); assertEquals(4, explain.length); @@ -397,17 +384,17 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray(); assertArrayEquals(new String[]{"a"}, rowStrings); } finally { - hiveShell.execute("drop table src"); + tableEnv.executeSql("drop table src"); } } @Test public void testParallelismSetting() { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "test_parallelism"; - hiveShell.execute("CREATE TABLE source_db.test_parallelism " + - "(year STRING, value INT) partitioned by (pt int);"); + TableEnvironment tEnv = createTableEnv(); + tEnv.executeSql("CREATE TABLE source_db.test_parallelism " + + "(`year` STRING, `value` INT) partitioned by (pt int)"); HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{"2014", 3}) .addRow(new Object[]{"2014", 4}) @@ -416,8 +403,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[]{"2015", 2}) .addRow(new Object[]{"2015", 5}) .commit("pt=1"); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.registerCatalog(catalogName, hiveCatalog); Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism"); PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner(); RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table)); @@ -429,11 +414,15 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { @Test public void testParallelismOnLimitPushDown() { - final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "test_parallelism_limit_pushdown"; - hiveShell.execute("CREATE TABLE source_db.test_parallelism_limit_pushdown " + - "(year STRING, value INT) partitioned by (pt int);"); + TableEnvironment tEnv = createTableEnv(); + tEnv.getConfig().getConfiguration().setBoolean( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + tEnv.getConfig().getConfiguration().setInteger( + ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + tEnv.executeSql("CREATE TABLE source_db.test_parallelism_limit_pushdown " + + "(`year` STRING, `value` INT) partitioned by (pt int)"); HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{"2014", 3}) .addRow(new Object[]{"2014", 4}) @@ -442,12 +431,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[]{"2015", 2}) .addRow(new Object[]{"2015", 5}) .commit("pt=1"); - TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tEnv.getConfig().getConfiguration().setBoolean( - HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); - tEnv.getConfig().getConfiguration().setInteger( - ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - tEnv.registerCatalog(catalogName, hiveCatalog); Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1"); PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner(); RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table)); @@ -480,7 +463,11 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "stream_test"; - hiveShell.execute("CREATE TABLE source_db.stream_test (" + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE); + tEnv.registerCatalog(catalogName, hiveCatalog); + tEnv.useCatalog(catalogName); + tEnv.executeSql("CREATE TABLE source_db.stream_test (" + " a INT," + " b STRING" + ") PARTITIONED BY (ts STRING) TBLPROPERTIES (" + @@ -493,9 +480,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { .addRow(new Object[]{0, "0"}) .commit("ts='2020-05-06 00:00:00'"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env); - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.from("hive.source_db.stream_test"); TestingAppendRowDataSink sink = new TestingAppendRowDataSink(new RowDataTypeInfo( @@ -553,8 +537,13 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { private void testNonPartitionStreamingSource(Boolean useMapredReader, String tblName) throws Exception { final String catalogName = "hive"; - final String dbName = "source_db"; - hiveShell.execute("CREATE TABLE source_db." + tblName + " (" + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE); + tEnv.getConfig().getConfiguration().setBoolean( + HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader); + tEnv.registerCatalog(catalogName, hiveCatalog); + tEnv.useCatalog(catalogName); + tEnv.executeSql("CREATE TABLE source_db." + tblName + " (" + " a INT," + " b CHAR(1) " + ") stored as parquet TBLPROPERTIES (" + @@ -562,12 +551,6 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { " 'streaming-source.monitor-interval'='100ms'" + ")"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env); - tEnv.getConfig().getConfiguration().setBoolean( - HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, useMapredReader); - - tEnv.registerCatalog(catalogName, hiveCatalog); Table src = tEnv.sqlQuery("select * from hive.source_db." + tblName); TestingAppendSink sink = new TestingAppendSink(); @@ -672,6 +655,13 @@ public class HiveTableSourceTest extends BatchAbstractTestBase { tEnv.executeSql("select * from parquet_t").collect().next()); } + private static TableEnvironment createTableEnv() { + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog("hive", hiveCatalog); + tableEnv.useCatalog("hive"); + return tableEnv; + } + /** * A sub-class of HiveTableSource to test vector reader switch. */ diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java index fa31c8b..2dcd21b 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.HiveVersionTestUtil; +import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; @@ -90,12 +91,11 @@ public class TableEnvHiveConnectorTest { @Test public void testDefaultPartitionName() throws Exception { - hiveShell.execute("create database db1"); - hiveShell.execute("create table db1.src (x int, y int)"); - hiveShell.execute("create table db1.part (x int) partitioned by (y int)"); - HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit(); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); + tableEnv.executeSql("create table db1.src (x int, y int)"); + tableEnv.executeSql("create table db1.part (x int) partitioned by (y int)"); + HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit(); // test generating partitions with default name TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.part select * from db1.src"); @@ -110,21 +110,20 @@ public class TableEnvHiveConnectorTest { List<Row> rows = Lists.newArrayList(flinkTable.execute().collect()); assertEquals(Arrays.toString(new String[]{"1,1", "null,2"}), rows.toString()); - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } @Test public void testGetNonExistingFunction() throws Exception { - hiveShell.execute("create database db1"); - hiveShell.execute("create table db1.src (d double, s string)"); - hiveShell.execute("create table db1.dest (x bigint)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); + tableEnv.executeSql("create table db1.src (d double, s string)"); + tableEnv.executeSql("create table db1.dest (x bigint)"); // just make sure the query runs through, no need to verify result TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select count(d) from db1.src"); - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } @Test @@ -142,7 +141,7 @@ public class TableEnvHiveConnectorTest { private void readWriteFormat(String format) throws Exception { TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); - hiveShell.execute("create database db1"); + tableEnv.executeSql("create database db1"); // create source and dest tables String suffix; @@ -164,9 +163,9 @@ public class TableEnvHiveConnectorTest { tableSchema = "(i int,s string,ts timestamp)"; } - hiveShell.execute(String.format( + tableEnv.executeSql(String.format( "create table db1.src %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix)); - hiveShell.execute(String.format( + tableEnv.executeSql(String.format( "create table db1.dest %s partitioned by (p1 string, p2 timestamp) %s", tableSchema, suffix)); // prepare source data with Hive @@ -194,7 +193,7 @@ public class TableEnvHiveConnectorTest { // verify data on hive side verifyHiveQueryResult("select * from db1.dest", expected); - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } private String toRowValue(List<Object> row) { @@ -209,16 +208,16 @@ public class TableEnvHiveConnectorTest { @Test public void testDecimal() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src1 (x decimal(10,2))"); - hiveShell.execute("create table db1.src2 (x decimal(10,2))"); - hiveShell.execute("create table db1.dest (x decimal(10,2))"); + tableEnv.executeSql("create table db1.src1 (x decimal(10,2))"); + tableEnv.executeSql("create table db1.src2 (x decimal(10,2))"); + tableEnv.executeSql("create table db1.dest (x decimal(10,2))"); // populate src1 from Hive // TABLE keyword in INSERT INTO is mandatory prior to 1.1.0 hiveShell.execute("insert into table db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); // populate src2 with same data from Flink TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " + "(cast(5.123 as decimal(10,2))), (cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))"); @@ -229,24 +228,25 @@ public class TableEnvHiveConnectorTest { TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src1"); verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testInsertOverwrite() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { // non-partitioned - hiveShell.execute("create table db1.dest (x int, y string)"); + tableEnv.executeSql("create table db1.dest (x int, y string)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit(); verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb")); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest values (3, 'c')"); verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc")); // static partition - hiveShell.execute("create table db1.part(x int) partitioned by (y int)"); + tableEnv.executeSql("create table db1.part(x int) partitioned by (y int)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("y=1"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("y=2"); tableEnv = getTableEnvWithHiveCatalog(); @@ -259,67 +259,68 @@ public class TableEnvHiveConnectorTest { // only overwrite dynamically matched partitions, other existing partitions remain intact verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testStaticPartition() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x int)"); + tableEnv.executeSql("create table db1.src (x int)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit(); - hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest partition (p1='1''1', p2=1.1) select x from db1.src"); assertEquals(1, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testDynamicPartition() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x int, y string, z double)"); + tableEnv.executeSql("create table db1.src (x int, y string, z double)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{1, "a", 1.1}) .addRow(new Object[]{2, "a", 2.2}) .addRow(new Object[]{3, "b", 3.3}) .commit(); - hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 string, p2 double)"); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src"); assertEquals(3, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testPartialDynamicPartition() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x int, y string)"); + tableEnv.executeSql("create table db1.src (x int, y string)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit(); - hiveShell.execute("create table db1.dest (x int) partitioned by (p1 double, p2 string)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create table db1.dest (x int) partitioned by (p1 double, p2 string)"); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest partition (p1=1.1) select x,y from db1.src"); assertEquals(2, hiveCatalog.listPartitions(new ObjectPath("db1", "dest")).size()); verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\t1.1\ta", "2\t1.1\tb")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testDateTimestampPartitionColumns() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.part(x int) partitioned by (dt date,ts timestamp)"); + tableEnv.executeSql("create table db1.part(x int) partitioned by (dt date,ts timestamp)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{1}) .addRow(new Object[]{2}) @@ -327,7 +328,6 @@ public class TableEnvHiveConnectorTest { HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part") .addRow(new Object[]{3}) .commit("dt='2019-12-25',ts='2019-12-25 16:23:43.012'"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part order by x").execute().collect()); assertEquals("[1,2019-12-23,2019-12-23T00:00, 2,2019-12-23,2019-12-23T00:00, 3,2019-12-25,2019-12-25T16:23:43.012]", results.toString()); @@ -338,7 +338,7 @@ public class TableEnvHiveConnectorTest { results = Lists.newArrayList(tableEnv.sqlQuery("select max(dt) from db1.part").execute().collect()); assertEquals("[2019-12-31]", results.toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @@ -351,11 +351,12 @@ public class TableEnvHiveConnectorTest { // Therefore disable such tests for older Hive versions. String hiveVersion = HiveShimLoader.getHiveVersion(); Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || hiveVersion.compareTo("1.3.0") >= 0); - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.simple (i int,a array<int>)"); - hiveShell.execute("create table db1.nested (a array<map<int, string>>)"); - hiveShell.execute("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'"); + tableEnv.executeSql("create table db1.simple (i int,a array<int>)"); + tableEnv.executeSql("create table db1.nested (a array<map<int, string>>)"); + tableEnv.executeSql("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'"); hiveShell.insertInto("db1", "simple").addRow(3, Arrays.asList(1, 2, 3)).commit(); Map<Integer, String> map1 = new HashMap<>(); map1.put(1, "a"); @@ -364,7 +365,6 @@ public class TableEnvHiveConnectorTest { map2.put(3, "c"); hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit(); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); List<Row> results = Lists.newArrayList( tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)").execute().collect()); assertEquals("[1, 2, 3]", results.toString()); @@ -372,7 +372,7 @@ public class TableEnvHiveConnectorTest { tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)").execute().collect()); assertEquals("[{1=a, 2=b}, {3=c}]", results.toString()); - hiveShell.execute("create table db1.ts (a array<timestamp>)"); + tableEnv.executeSql("create table db1.ts (a array<timestamp>)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "ts").addRow(new Object[]{ new Object[]{Timestamp.valueOf("2015-04-28 15:23:00"), Timestamp.valueOf("2016-06-03 17:05:52")}}) .commit(); @@ -380,17 +380,18 @@ public class TableEnvHiveConnectorTest { tableEnv.sqlQuery("select x from db1.ts, lateral table(hiveudtf(a)) as T(x)").execute().collect()); assertEquals("[2015-04-28T15:23, 2016-06-03T17:05:52]", results.toString()); } finally { - hiveShell.execute("drop database db1 cascade"); - hiveShell.execute("drop function hiveudtf"); + tableEnv.executeSql("drop database db1 cascade"); + tableEnv.executeSql("drop function hiveudtf"); } } @Test public void testNotNullConstraints() throws Exception { Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER); - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.tbl (x int,y bigint not null enable rely,z string not null enable norely)"); + tableEnv.executeSql("create table db1.tbl (x int,y bigint not null enable rely,z string not null enable norely)"); CatalogBaseTable catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl")); TableSchema tableSchema = catalogTable.getSchema(); assertTrue("By default columns should be nullable", @@ -400,7 +401,7 @@ public class TableEnvHiveConnectorTest { assertTrue("NOT NULL NORELY columns should be considered nullable", tableSchema.getFieldDataTypes()[2].getLogicalType().isNullable()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @@ -409,10 +410,11 @@ public class TableEnvHiveConnectorTest { // While PK constraints are supported since Hive 2.1.0, the constraints cannot be RELY in 2.x versions. // So let's only test for 3.x. Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER); - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { // test rely PK constraints - hiveShell.execute("create table db1.tbl1 (x tinyint,y smallint,z int, primary key (x,z) disable novalidate rely)"); + tableEnv.executeSql("create table db1.tbl1 (x tinyint,y smallint,z int, primary key (x,z) disable novalidate rely)"); CatalogBaseTable catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl1")); TableSchema tableSchema = catalogTable.getSchema(); assertTrue(tableSchema.getPrimaryKey().isPresent()); @@ -421,32 +423,32 @@ public class TableEnvHiveConnectorTest { assertTrue(pk.getColumns().containsAll(Arrays.asList("x", "z"))); // test norely PK constraints - hiveShell.execute("create table db1.tbl2 (x tinyint,y smallint, primary key (x) disable norely)"); + tableEnv.executeSql("create table db1.tbl2 (x tinyint,y smallint, primary key (x) disable norely)"); catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl2")); tableSchema = catalogTable.getSchema(); assertFalse(tableSchema.getPrimaryKey().isPresent()); // test table w/o PK - hiveShell.execute("create table db1.tbl3 (x tinyint)"); + tableEnv.executeSql("create table db1.tbl3 (x tinyint)"); catalogTable = hiveCatalog.getTable(new ObjectPath("db1", "tbl3")); tableSchema = catalogTable.getSchema(); assertFalse(tableSchema.getPrimaryKey().isPresent()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testTimestamp() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (ts timestamp)"); - hiveShell.execute("create table db1.dest (ts timestamp)"); + tableEnv.executeSql("create table db1.src (ts timestamp)"); + tableEnv.executeSql("create table db1.dest (ts timestamp)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{Timestamp.valueOf("2019-11-11 00:00:00")}) .addRow(new Object[]{Timestamp.valueOf("2019-12-03 15:43:32.123456789")}) .commit(); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); // test read timestamp from hive List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()); assertEquals(2, results.size()); @@ -456,21 +458,21 @@ public class TableEnvHiveConnectorTest { TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select max(ts) from db1.src"); verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-03 15:43:32.123456789")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testDate() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (dt date)"); - hiveShell.execute("create table db1.dest (dt date)"); + tableEnv.executeSql("create table db1.src (dt date)"); + tableEnv.executeSql("create table db1.dest (dt date)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{Date.valueOf("2019-12-09")}) .addRow(new Object[]{Date.valueOf("2019-12-12")}) .commit(); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); // test read date from hive List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()); assertEquals(2, results.size()); @@ -480,15 +482,16 @@ public class TableEnvHiveConnectorTest { TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select max(dt) from db1.src"); verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("2019-12-12")); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testViews() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (key int,val string)"); + tableEnv.executeSql("create table db1.src (key int,val string)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{1, "a"}) .addRow(new Object[]{1, "aa"}) @@ -497,7 +500,7 @@ public class TableEnvHiveConnectorTest { .addRow(new Object[]{3, "c"}) .addRow(new Object[]{3, "ccc"}) .commit(); - hiveShell.execute("create table db1.keys (key int,name string)"); + tableEnv.executeSql("create table db1.keys (key int,name string)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "keys") .addRow(new Object[]{1, "key1"}) .addRow(new Object[]{2, "key2"}) @@ -507,7 +510,6 @@ public class TableEnvHiveConnectorTest { hiveShell.execute("create view db1.v1 as select key as k,val as v from db1.src limit 2"); hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key"); hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select count(v) from db1.v1").execute().collect()); assertEquals("[2]", results.toString()); results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v2").execute().collect()); @@ -515,16 +517,16 @@ public class TableEnvHiveConnectorTest { results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v3").execute().collect()); assertEquals("[1,key1,3, 2,key2,1, 3,key3,2]", results.toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testWhitespacePartValue() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.dest (x int) partitioned by (p string)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create table db1.dest (x int) partitioned by (p string)"); StatementSet stmtSet = tableEnv.createStatementSet(); stmtSet.addInsertSql("insert into db1.dest select 1,' '"); stmtSet.addInsertSql("insert into db1.dest select 2,'a \t'"); @@ -533,29 +535,29 @@ public class TableEnvHiveConnectorTest { tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); assertEquals("[p= , p=a %09]", hiveShell.executeQuery("show partitions db1.dest").toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } private void testCompressTextTable(boolean batch) throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = batch ? + getTableEnvWithHiveCatalog() : + getStreamTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x string,y string)"); + tableEnv.executeSql("create table db1.src (x string,y string)"); hiveShell.execute("create table db1.dest like db1.src"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{"a", "b"}) .addRow(new Object[]{"c", "d"}) .commit(); hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true); - TableEnvironment tableEnv = batch ? - getTableEnvWithHiveCatalog() : - getStreamTableEnvWithHiveCatalog(); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into db1.dest select * from db1.src"); List<String> expected = Arrays.asList("a\tb", "c\td"); verifyHiveQueryResult("select * from db1.dest", expected); verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.dest"), expected); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @@ -571,52 +573,52 @@ public class TableEnvHiveConnectorTest { @Test public void testRegexSerDe() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x int,y string) " + + tableEnv.executeSql("create table db1.src (x int,y string) " + "row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' " + - "with serdeproperties ('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')"); + "with serdeproperties ('input.regex'='([\\d]+)\\u0001([\\S]+)')"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src") .addRow(new Object[]{1, "a"}) .addRow(new Object[]{2, "ab"}) .commit(); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); assertEquals("[1,a, 2,ab]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src order by x").execute().collect()).toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testUpdatePartitionSD() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.dest (x int) partitioned by (p string) stored as rcfile"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create table db1.dest (x int) partitioned by (p string) stored as rcfile"); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest partition (p='1') select 1"); - hiveShell.execute("alter table db1.dest set fileformat sequencefile"); + tableEnv.executeSql("alter table db1.dest set fileformat sequencefile"); TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert overwrite db1.dest partition (p='1') select 1"); assertEquals("[1,1]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.dest").execute().collect()).toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testParquetNameMapping() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.t1 (x int,y int) stored as parquet"); - hiveShell.execute("insert into table db1.t1 values (1,10),(2,20)"); + tableEnv.executeSql("create table db1.t1 (x int,y int) stored as parquet"); + TableEnvUtil.execInsertSqlAndWaitResult(tableEnv, "insert into table db1.t1 values (1,10),(2,20)"); Table hiveTable = hiveCatalog.getHiveTable(new ObjectPath("db1", "t1")); String location = hiveTable.getSd().getLocation(); - hiveShell.execute(String.format("create table db1.t2 (y int,x int) stored as parquet location '%s'", location)); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql(String.format("create table db1.t2 (y int,x int) stored as parquet location '%s'", location)); tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true); assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t1").execute().collect()).toString()); assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t2").execute().collect()).toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @@ -625,47 +627,47 @@ public class TableEnvHiveConnectorTest { // not supported until 2.1.0 -- https://issues.apache.org/jira/browse/HIVE-11981, // https://issues.apache.org/jira/browse/HIVE-13178 Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER); - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.src (x smallint,y int) stored as orc"); + tableEnv.executeSql("create table db1.src (x smallint,y int) stored as orc"); hiveShell.execute("insert into table db1.src values (1,100),(2,200)"); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true); - hiveShell.execute("alter table db1.src change x x int"); + tableEnv.executeSql("alter table db1.src change x x int"); assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString()); - hiveShell.execute("alter table db1.src change y y string"); + tableEnv.executeSql("alter table db1.src change y y string"); assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } @Test public void testNonExistingPartitionFolder() throws Exception { - hiveShell.execute("create database db1"); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); try { - hiveShell.execute("create table db1.part (x int) partitioned by (p int)"); + tableEnv.executeSql("create table db1.part (x int) partitioned by (p int)"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("p=1"); HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("p=2"); - hiveShell.execute("alter table db1.part add partition (p=3)"); + tableEnv.executeSql("alter table db1.part add partition (p=3)"); // remove one partition Path toRemove = new Path(hiveCatalog.getHiveTable(new ObjectPath("db1", "part")).getSd().getLocation(), "p=2"); FileSystem fs = toRemove.getFileSystem(hiveShell.getHiveConf()); fs.delete(toRemove, true); - TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part").execute().collect()); assertEquals("[1,1]", results.toString()); } finally { - hiveShell.execute("drop database db1 cascade"); + tableEnv.executeSql("drop database db1 cascade"); } } private TableEnvironment getTableEnvWithHiveCatalog() { - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); return tableEnv; @@ -673,7 +675,7 @@ public class TableEnvHiveConnectorTest { private TableEnvironment getStreamTableEnvWithHiveCatalog() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); return tableEnv; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java index d3c6ed4..4ecf3f6 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; @@ -251,51 +252,47 @@ public class HiveCatalogUseBlinkITCase extends AbstractTestBase { @Test public void testTimestampUDF() throws Exception { - hiveCatalog.createFunction(new ObjectPath("default", "myyear"), - new CatalogFunctionImpl(UDFYear.class.getCanonicalName()), - false); - hiveShell.execute("create table src(ts timestamp)"); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + tableEnv.executeSql(String.format("create function myyear as '%s'", UDFYear.class.getName())); + tableEnv.executeSql("create table src(ts timestamp)"); try { HiveTestUtils.createTextTableInserter(hiveShell, "default", "src") .addRow(new Object[]{Timestamp.valueOf("2013-07-15 10:00:00")}) .addRow(new Object[]{Timestamp.valueOf("2019-05-23 17:32:55")}) .commit(); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); - tableEnv.useCatalog(hiveCatalog.getName()); List<Row> results = Lists.newArrayList( tableEnv.sqlQuery("select myyear(ts) as y from src").execute().collect()); Assert.assertEquals(2, results.size()); Assert.assertEquals("[2013, 2019]", results.toString()); } finally { - hiveShell.execute("drop table src"); + tableEnv.executeSql("drop table src"); } } @Test public void testDateUDF() throws Exception { - hiveCatalog.createFunction(new ObjectPath("default", "mymonth"), - new CatalogFunctionImpl(UDFMonth.class.getCanonicalName()), - false); - hiveShell.execute("create table src(dt date)"); + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + tableEnv.executeSql(String.format("create function mymonth as '%s'", UDFMonth.class.getName())); + tableEnv.executeSql("create table src(dt date)"); try { HiveTestUtils.createTextTableInserter(hiveShell, "default", "src") .addRow(new Object[]{Date.valueOf("2019-01-19")}) .addRow(new Object[]{Date.valueOf("2019-03-02")}) .commit(); - TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); - tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); - tableEnv.useCatalog(hiveCatalog.getName()); List<Row> results = Lists.newArrayList( tableEnv.sqlQuery("select mymonth(dt) as m from src order by m").execute().collect()); Assert.assertEquals(2, results.size()); Assert.assertEquals("[1, 3]", results.toString()); } finally { - hiveShell.execute("drop table src"); + tableEnv.executeSql("drop table src"); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 252c6e1..57cbd8f 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -113,17 +114,28 @@ public class HiveTestUtils { } public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() { + return createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT); + } + + public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode(SqlDialect dialect) { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); + tableEnv.getConfig().setSqlDialect(dialect); return tableEnv; } public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode( StreamExecutionEnvironment env) { + return createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.DEFAULT); + } + + public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamMode( + StreamExecutionEnvironment env, SqlDialect dialect) { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); + tableEnv.getConfig().setSqlDialect(dialect); return tableEnv; }
