This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7b2e6e7 [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner 7b2e6e7 is described below commit 7b2e6e70bfccf2ffd051d61782d50d3ab9ef0443 Author: zjuwangg <zjuwa...@foxmail.com> AuthorDate: Tue Jul 16 21:01:13 2019 +0800 [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner Refactor HiveTableSourceTest to use HiveRunner. This closes #9130. --- flink-connectors/flink-connector-hive/pom.xml | 8 -- .../batch/connectors/hive/HiveTableSourceTest.java | 100 +++++++++++++-------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index f9a037b..02e15c9 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -389,10 +389,6 @@ under the License. <artifactId>hive-jdbc</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-webhcat-java-client</artifactId> - </exclusion> - <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> </exclusion> @@ -402,10 +398,6 @@ under the License. </exclusion> <exclusion> <groupId>org.apache.tez</groupId> - <artifactId>tez-dag</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.tez</groupId> <artifactId>tez-common</artifactId> </exclusion> <exclusion> diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java index d6cf124..e127639 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java @@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; -import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; -import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.types.Row; +import com.klarna.hiverunner.HiveShell; +import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.mapred.JobConf; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + /** * Tests {@link HiveTableSource}. */ +@RunWith(FlinkStandaloneHiveRunner.class) public class HiveTableSourceTest { - public static final String DEFAULT_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName(); - public static final String DEFAULT_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName(); - public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName(); + + @HiveSQL(files = {}) + private static HiveShell hiveShell; private static HiveCatalog hiveCatalog; private static HiveConf hiveConf; @BeforeClass public static void createCatalog() throws IOException { - hiveConf = HiveTestUtils.createHiveConf(); + hiveConf = hiveShell.getHiveConf(); hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); hiveCatalog.open(); } @@ -74,10 +75,24 @@ public class HiveTableSourceTest { } } + @Before + public void setupSourceDatabaseAndData() { + hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db"); + } + @Test public void testReadNonPartitionedTable() throws Exception { - final String dbName = "default"; + final String dbName = "source_db"; final String tblName = "test"; + hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)"); + hiveShell.insertInto(dbName, tblName) + .withAllColumns() + .addRow(1, 1, "a", 1000L, 1.11) + .addRow(2, 2, "b", 2000L, 2.22) + .addRow(3, 3, "c", 3000L, 3.33) + .addRow(4, 4, "d", 4000L, 4.44) + .commit(); + TableSchema tableSchema = new TableSchema( new String[]{"a", "b", "c", "d", "e"}, new TypeInformation[]{ @@ -87,29 +102,6 @@ public class HiveTableSourceTest { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO} ); - //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set - //serDe temporarily. - HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null); - org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table(); - tbl.setDbName(dbName); - tbl.setTableName(tblName); - tbl.setCreateTime((int) (System.currentTimeMillis() / 1000)); - tbl.setParameters(new HashMap<>()); - StorageDescriptor sd = new StorageDescriptor(); - String location = HiveTableSourceTest.class.getResource("/test").getPath(); - sd.setLocation(location); - sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS); - sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS); - sd.setSerdeInfo(new SerDeInfo()); - sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS); - sd.getSerdeInfo().setParameters(new HashMap<>()); - sd.getSerdeInfo().getParameters().put("serialization.format", "1"); - sd.getSerdeInfo().getParameters().put("field.delim", ","); - sd.setCols(HiveTableUtil.createHiveColumns(tableSchema)); - tbl.setSd(sd); - tbl.setPartitionKeys(new ArrayList<>()); - - client.createTable(tbl); ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv); ObjectPath tablePath = new ObjectPath(dbName, tblName); @@ -125,4 +117,42 @@ public class HiveTableSourceTest { Assert.assertEquals(3, rows.get(2).getField(0)); Assert.assertEquals(4, rows.get(3).getField(0)); } + + /** + * Test to read from partition table. + * @throws Exception + */ + @Test + public void testReadPartitionTable() throws Exception { + final String dbName = "source_db"; + final String tblName = "test_table_pt"; + hiveShell.execute("CREATE TABLE source_db.test_table_pt " + + "(year STRING, value INT) partitioned by (pt int);"); + hiveShell.insertInto("source_db", "test_table_pt") + .withColumns("year", "value", "pt") + .addRow("2014", 3, 0) + .addRow("2014", 4, 0) + .addRow("2015", 2, 1) + .addRow("2015", 5, 1) + .commit(); + TableSchema tableSchema = new TableSchema( + new String[]{"year", "value", "int"}, + new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO} + ); + ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); + BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv); + ObjectPath tablePath = new ObjectPath(dbName, tblName); + CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath); + HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable); + Table src = tEnv.fromTableSource(hiveTableSource); + DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames())); + List<Row> rows = rowDataSet.collect(); + assertEquals(4, rows.size()); + Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray(); + assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings); + } + }