This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 1bbbe2e8 [FLINK-28730] Add Tez execution engine test for Table Store 
Hive connector
1bbbe2e8 is described below

commit 1bbbe2e81d9e269f62700a9749eb2c8a7e984f86
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 29 10:32:09 2022 +0800

    [FLINK-28730] Add Tez execution engine test for Table Store Hive connector
    
    This closes #254
---
 docs/content/docs/engines/hive.md                  |   4 +
 .../flink-table-store-hive-connector/pom.xml       |   8 --
 .../table/store/mapred/TableStoreInputFormat.java  |  10 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   | 141 ++++++++++++++++++++-
 4 files changed, 151 insertions(+), 12 deletions(-)

diff --git a/docs/content/docs/engines/hive.md 
b/docs/content/docs/engines/hive.md
index 6cc49777..92a51578 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -34,6 +34,10 @@ Table Store currently supports the following features 
related with Hive:
 
 Table Store currently supports Hive 2.x.
 
+## Execution Engine
+
+Table Store currently supports MR and Tez execution engine for Hive.
+
 ## Install
 
 {{< stable >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/pom.xml 
b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
index e83ef678..735c8b7d 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
@@ -197,10 +197,6 @@ under the License.
                     <groupId>org.apache.hive.hcatalog</groupId>
                     <artifactId>hive-webhcat-java-client</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.apache.tez</groupId>
-                    <artifactId>tez-common</artifactId>
-                </exclusion>
                 <exclusion>
                     <!-- This dependency is no longer shipped with the JDK 
since Java 9.-->
                     <groupId>jdk.tools</groupId>
@@ -246,10 +242,6 @@ under the License.
                     <artifactId>hadoop-yarn-server-web-proxy</artifactId>
                     <groupId>org.apache.hadoop</groupId>
                 </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-shim</artifactId>
-                    <groupId>org.apache.tez</groupId>
-                </exclusion>
                 <exclusion>
                     <artifactId>jms</artifactId>
                     <groupId>javax.jms</groupId>
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 5d24bef9..8e1b0def 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -73,7 +73,7 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
                 read,
                 split,
                 table.schema().fieldNames(),
-                
Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
+                Arrays.asList(getSelectedColumns(jobConf)));
     }
 
     private FileStoreTable createFileStoreTable(JobConf jobConf) {
@@ -97,4 +97,12 @@ public class TableStoreInputFormat implements 
InputFormat<Void, RowDataContainer
                         sarg, tableSchema.fieldNames(), 
tableSchema.logicalRowType().getChildren());
         return converter.convert();
     }
+
+    private String[] getSelectedColumns(JobConf jobConf) {
+        // when using tez engine or when same table is joined multiple times,
+        // it is possible that some selected columns are duplicated
+        return Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
+                .distinct()
+                .toArray(String[]::new);
+    }
 }
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index b73f9b76..1301b5e9 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -46,7 +46,8 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -68,13 +69,33 @@ import java.util.concurrent.ThreadLocalRandom;
 @RunWith(FlinkEmbeddedHiveRunner.class)
 public class TableStoreHiveStorageHandlerITCase {
 
-    @Rule public TemporaryFolder folder = new TemporaryFolder();
+    @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
 
     @HiveSQL(files = {})
     private static HiveShell hiveShell;
 
+    private static String engine;
+
+    @BeforeClass
+    public static void beforeClass() {
+        // TODO Currently FlinkEmbeddedHiveRunner can only be used for one 
test class,
+        //  so we have to select engine randomly. Write our own Hive tester in 
the future.
+        engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+    }
+
     @Before
     public void before() {
+        if ("mr".equals(engine)) {
+            hiveShell.execute("SET hive.execution.engine=mr");
+        } else if ("tez".equals(engine)) {
+            hiveShell.execute("SET hive.execution.engine=tez");
+            hiveShell.execute("SET tez.local.mode=true");
+            hiveShell.execute("SET hive.jar.directory=" + 
folder.getRoot().getAbsolutePath());
+            hiveShell.execute("SET tez.staging-dir=" + 
folder.getRoot().getAbsolutePath());
+        } else {
+            throw new UnsupportedOperationException("Unsupported engine " + 
engine);
+        }
+
         hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
         hiveShell.execute("USE test_db");
     }
@@ -138,6 +159,33 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT d, sum(b) FROM " + tableName + " GROUP BY d 
ORDER BY d");
         expected = Arrays.asList("200\t70", "400\t40", "1000\t10");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.a, T1.b, T1.d + T2.d FROM "
+                                + tableName
+                                + " T1 INNER JOIN "
+                                + tableName
+                                + " T2 ON T1.a = T2.a AND T1.b = T2.b ORDER BY 
T1.a, T1.b");
+        expected = Arrays.asList("1\t10\t2000", "1\t20\t400", "2\t40\t800", 
"3\t50\t400");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.a, T1.b, T2.b, T1.d + T2.d FROM "
+                                + tableName
+                                + " T1 INNER JOIN "
+                                + tableName
+                                + " T2 ON T1.a = T2.a ORDER BY T1.a, T1.b, 
T2.b");
+        expected =
+                Arrays.asList(
+                        "1\t10\t10\t2000",
+                        "1\t10\t20\t1200",
+                        "1\t20\t10\t1200",
+                        "1\t20\t20\t400",
+                        "2\t40\t40\t800",
+                        "3\t50\t50\t400");
+        Assert.assertEquals(expected, actual);
     }
 
     @Test
@@ -203,6 +251,29 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT b, sum(a), max(c) FROM " + tableName + " GROUP 
BY b ORDER BY b");
         expected = Arrays.asList("100\t30\tHi Again", "200\t40\tStore");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, b FROM (SELECT T1.a AS a, T1.b + T2.b AS b 
FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.a = T2.a) T3 ORDER BY a, b");
+        expected = Arrays.asList("10\t200", "10\t300", "10\t300", "10\t400", 
"20\t200", "30\t400");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT b, a FROM (SELECT T1.b AS b, T1.a + T2.a AS a 
FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.b = T2.b) T3 ORDER BY b, a");
+        expected =
+                Arrays.asList(
+                        "100\t20", "100\t30", "100\t30", "100\t40", "200\t20", 
"200\t40", "200\t40",
+                        "200\t60");
+        Assert.assertEquals(expected, actual);
     }
 
     @Test
@@ -262,6 +333,24 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT d, sum(b) FROM " + tableName + " GROUP BY d 
ORDER BY d");
         expected = Arrays.asList("100\t10", "200\t70", "400\t40", "1000\t10");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.b, T1.d, T2.d FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.b = T2.b ORDER BY T1.b, T1.d, 
T2.d");
+        expected =
+                Arrays.asList(
+                        "10\t100\t100",
+                        "10\t100\t1000",
+                        "10\t1000\t100",
+                        "10\t1000\t1000",
+                        "20\t200\t200",
+                        "40\t400\t400",
+                        "50\t200\t200");
+        Assert.assertEquals(expected, actual);
     }
 
     @Test
@@ -323,6 +412,24 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP 
BY a ORDER BY a");
         expected = Arrays.asList("10\t400\tHello", "20\t400\tNULL", 
"30\t500\tStore");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.b, T1.c, T2.c FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.b = T2.b ORDER BY T1.b, T1.c, 
T2.c");
+        expected =
+                Arrays.asList(
+                        "100\tHi\tHi",
+                        "100\tHi\tHi Again",
+                        "100\tHi Again\tHi",
+                        "100\tHi Again\tHi Again",
+                        "200\tHello\tHello",
+                        "400\tNULL\tNULL",
+                        "500\tStore\tStore");
+        Assert.assertEquals(expected, actual);
     }
 
     @Test
@@ -371,7 +478,9 @@ public class TableStoreHiveStorageHandlerITCase {
                         "World\t30");
         Assert.assertEquals(expected, actual);
 
-        actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE 
d < 300 ORDER BY d");
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT * FROM " + tableName + " WHERE d < 300 ORDER 
BY b, d");
         expected = Arrays.asList("1\t10\tHi\t100", "1\t20\tHello\t200", 
"3\t50\tStore\t200");
         Assert.assertEquals(expected, actual);
 
@@ -380,6 +489,16 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT a, sum(d) FROM " + tableName + " GROUP BY a 
ORDER BY a");
         expected = Arrays.asList("1\t1300", "2\t700", "3\t200");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.a, T1.b, T2.b FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.a = T2.a WHERE T1.a > 1 ORDER BY 
T1.a, T1.b, T2.b");
+        expected = Arrays.asList("2\t30\t30", "2\t30\t40", "2\t40\t30", 
"2\t40\t40", "3\t50\t50");
+        Assert.assertEquals(expected, actual);
     }
 
     @Test
@@ -450,6 +569,22 @@ public class TableStoreHiveStorageHandlerITCase {
                         "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP 
BY a ORDER BY a");
         expected = Arrays.asList("10\t400\tHello", "20\t700\tWorld", 
"30\t500\tStore");
         Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT T1.a, T1.b, T2.b FROM "
+                                + tableName
+                                + " T1 JOIN "
+                                + tableName
+                                + " T2 ON T1.a = T2.a WHERE T1.a > 10 ORDER BY 
T1.a, T1.b, T2.b");
+        expected =
+                Arrays.asList(
+                        "20\t300\t300",
+                        "20\t300\t400",
+                        "20\t400\t300",
+                        "20\t400\t400",
+                        "30\t500\t500");
+        Assert.assertEquals(expected, actual);
     }
 
     private String createChangelogExternalTable(

Reply via email to