This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b2e6e7  [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
7b2e6e7 is described below

commit 7b2e6e70bfccf2ffd051d61782d50d3ab9ef0443
Author: zjuwangg <zjuwa...@foxmail.com>
AuthorDate: Tue Jul 16 21:01:13 2019 +0800

    [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
    
    Refactor HiveTableSourceTest to use HiveRunner.
    
    This closes #9130.
---
 flink-connectors/flink-connector-hive/pom.xml      |   8 --
 .../batch/connectors/hive/HiveTableSourceTest.java | 100 +++++++++++++--------
 2 files changed, 65 insertions(+), 43 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index f9a037b..02e15c9 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -389,10 +389,6 @@ under the License.
                     <artifactId>hive-jdbc</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hive.hcatalog</groupId>
-                    <artifactId>hive-webhcat-java-client</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>org.apache.hive</groupId>
                     <artifactId>hive-service</artifactId>
                 </exclusion>
@@ -402,10 +398,6 @@ under the License.
                 </exclusion>
                                <exclusion>
                                        <groupId>org.apache.tez</groupId>
-                                       <artifactId>tez-dag</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>org.apache.tez</groupId>
                                        <artifactId>tez-common</artifactId>
                                </exclusion>
                                <exclusion>
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index d6cf124..e127639 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.types.Row;
 
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests {@link HiveTableSource}.
  */
+@RunWith(FlinkStandaloneHiveRunner.class)
 public class HiveTableSourceTest {
-       public static final String DEFAULT_SERDE_CLASS = 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
-       public static final String DEFAULT_INPUT_FORMAT_CLASS = 
org.apache.hadoop.mapred.TextInputFormat.class.getName();
-       public static final String DEFAULT_OUTPUT_FORMAT_CLASS = 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
+
+       @HiveSQL(files = {})
+       private static HiveShell hiveShell;
 
        private static HiveCatalog hiveCatalog;
        private static HiveConf hiveConf;
 
        @BeforeClass
        public static void createCatalog() throws IOException {
-               hiveConf = HiveTestUtils.createHiveConf();
+               hiveConf = hiveShell.getHiveConf();
                hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
                hiveCatalog.open();
        }
@@ -74,10 +75,24 @@ public class HiveTableSourceTest {
                }
        }
 
+       @Before
+       public void setupSourceDatabaseAndData() {
+               hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db");
+       }
+
        @Test
        public void testReadNonPartitionedTable() throws Exception {
-               final String dbName = "default";
+               final String dbName = "source_db";
                final String tblName = "test";
+               hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, 
c STRING, d BIGINT, e DOUBLE)");
+               hiveShell.insertInto(dbName, tblName)
+                               .withAllColumns()
+                               .addRow(1, 1, "a", 1000L, 1.11)
+                               .addRow(2, 2, "b", 2000L, 2.22)
+                               .addRow(3, 3, "c", 3000L, 3.33)
+                               .addRow(4, 4, "d", 4000L, 4.44)
+                               .commit();
+
                TableSchema tableSchema = new TableSchema(
                                new String[]{"a", "b", "c", "d", "e"},
                                new TypeInformation[]{
@@ -87,29 +102,6 @@ public class HiveTableSourceTest {
                                                BasicTypeInfo.LONG_TYPE_INFO,
                                                BasicTypeInfo.DOUBLE_TYPE_INFO}
                );
-               //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
-               //serDe temporarily.
-               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
-               org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
-               tbl.setDbName(dbName);
-               tbl.setTableName(tblName);
-               tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-               tbl.setParameters(new HashMap<>());
-               StorageDescriptor sd = new StorageDescriptor();
-               String location = 
HiveTableSourceTest.class.getResource("/test").getPath();
-               sd.setLocation(location);
-               sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
-               sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-               sd.setSerdeInfo(new SerDeInfo());
-               sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS);
-               sd.getSerdeInfo().setParameters(new HashMap<>());
-               sd.getSerdeInfo().getParameters().put("serialization.format", 
"1");
-               sd.getSerdeInfo().getParameters().put("field.delim", ",");
-               sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
-               tbl.setSd(sd);
-               tbl.setPartitionKeys(new ArrayList<>());
-
-               client.createTable(tbl);
                ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
                BatchTableEnvironment tEnv = 
BatchTableEnvironment.create(execEnv);
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
@@ -125,4 +117,42 @@ public class HiveTableSourceTest {
                Assert.assertEquals(3, rows.get(2).getField(0));
                Assert.assertEquals(4, rows.get(3).getField(0));
        }
+
+       /**
+        * Test to read from partition table.
+        * @throws Exception
+        */
+       @Test
+       public void testReadPartitionTable() throws Exception {
+               final String dbName = "source_db";
+               final String tblName = "test_table_pt";
+               hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
+                                               "(year STRING, value INT) 
partitioned by (pt int);");
+               hiveShell.insertInto("source_db", "test_table_pt")
+                               .withColumns("year", "value", "pt")
+                               .addRow("2014", 3, 0)
+                               .addRow("2014", 4, 0)
+                               .addRow("2015", 2, 1)
+                               .addRow("2015", 5, 1)
+                               .commit();
+               TableSchema tableSchema = new TableSchema(
+                               new String[]{"year", "value", "int"},
+                               new TypeInformation[]{
+                                               BasicTypeInfo.STRING_TYPE_INFO,
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               BasicTypeInfo.INT_TYPE_INFO}
+               );
+               ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
+               BatchTableEnvironment tEnv = 
BatchTableEnvironment.create(execEnv);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+               HiveTableSource hiveTableSource = new HiveTableSource(new 
JobConf(hiveConf), tablePath, catalogTable);
+               Table src = tEnv.fromTableSource(hiveTableSource);
+               DataSet<Row> rowDataSet = tEnv.toDataSet(src, new 
RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
+               List<Row> rows = rowDataSet.collect();
+               assertEquals(4, rows.size());
+               Object[] rowStrings = 
rows.stream().map(Row::toString).sorted().toArray();
+               assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", 
"2015,2,1", "2015,5,1"}, rowStrings);
+       }
+
 }

Reply via email to