This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 1bbbe2e8 [FLINK-28730] Add Tez execution engine test for Table Store
Hive connector
1bbbe2e8 is described below
commit 1bbbe2e81d9e269f62700a9749eb2c8a7e984f86
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 29 10:32:09 2022 +0800
[FLINK-28730] Add Tez execution engine test for Table Store Hive connector
This closes #254
---
docs/content/docs/engines/hive.md | 4 +
.../flink-table-store-hive-connector/pom.xml | 8 --
.../table/store/mapred/TableStoreInputFormat.java | 10 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 141 ++++++++++++++++++++-
4 files changed, 151 insertions(+), 12 deletions(-)
diff --git a/docs/content/docs/engines/hive.md
b/docs/content/docs/engines/hive.md
index 6cc49777..92a51578 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -34,6 +34,10 @@ Table Store currently supports the following features
related with Hive:
Table Store currently supports Hive 2.x.
+## Execution Engine
+
+Table Store currently supports MR and Tez execution engine for Hive.
+
## Install
{{< stable >}}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
index e83ef678..735c8b7d 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
@@ -197,10 +197,6 @@ under the License.
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-common</artifactId>
- </exclusion>
<exclusion>
<!-- This dependency is no longer shipped with the JDK
since Java 9.-->
<groupId>jdk.tools</groupId>
@@ -246,10 +242,6 @@ under the License.
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
- <exclusion>
- <artifactId>hadoop-shim</artifactId>
- <groupId>org.apache.tez</groupId>
- </exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 5d24bef9..8e1b0def 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -73,7 +73,7 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
read,
split,
table.schema().fieldNames(),
-
Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
+ Arrays.asList(getSelectedColumns(jobConf)));
}
private FileStoreTable createFileStoreTable(JobConf jobConf) {
@@ -97,4 +97,12 @@ public class TableStoreInputFormat implements
InputFormat<Void, RowDataContainer
sarg, tableSchema.fieldNames(),
tableSchema.logicalRowType().getChildren());
return converter.convert();
}
+
+ private String[] getSelectedColumns(JobConf jobConf) {
+ // when using tez engine or when same table is joined multiple times,
+ // it is possible that some selected columns are duplicated
+ return Arrays.stream(ColumnProjectionUtils.getReadColumnNames(jobConf))
+ .distinct()
+ .toArray(String[]::new);
+ }
}
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index b73f9b76..1301b5e9 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -46,7 +46,8 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@@ -68,13 +69,33 @@ import java.util.concurrent.ThreadLocalRandom;
@RunWith(FlinkEmbeddedHiveRunner.class)
public class TableStoreHiveStorageHandlerITCase {
- @Rule public TemporaryFolder folder = new TemporaryFolder();
+ @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
@HiveSQL(files = {})
private static HiveShell hiveShell;
+ private static String engine;
+
+ @BeforeClass
+ public static void beforeClass() {
+ // TODO Currently FlinkEmbeddedHiveRunner can only be used for one
test class,
+ // so we have to select engine randomly. Write our own Hive tester in
the future.
+ engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+ }
+
@Before
public void before() {
+ if ("mr".equals(engine)) {
+ hiveShell.execute("SET hive.execution.engine=mr");
+ } else if ("tez".equals(engine)) {
+ hiveShell.execute("SET hive.execution.engine=tez");
+ hiveShell.execute("SET tez.local.mode=true");
+ hiveShell.execute("SET hive.jar.directory=" +
folder.getRoot().getAbsolutePath());
+ hiveShell.execute("SET tez.staging-dir=" +
folder.getRoot().getAbsolutePath());
+ } else {
+ throw new UnsupportedOperationException("Unsupported engine " +
engine);
+ }
+
hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
hiveShell.execute("USE test_db");
}
@@ -138,6 +159,33 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
expected = Arrays.asList("200\t70", "400\t40", "1000\t10");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.a, T1.b, T1.d + T2.d FROM "
+ + tableName
+ + " T1 INNER JOIN "
+ + tableName
+ + " T2 ON T1.a = T2.a AND T1.b = T2.b ORDER BY
T1.a, T1.b");
+ expected = Arrays.asList("1\t10\t2000", "1\t20\t400", "2\t40\t800",
"3\t50\t400");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.a, T1.b, T2.b, T1.d + T2.d FROM "
+ + tableName
+ + " T1 INNER JOIN "
+ + tableName
+ + " T2 ON T1.a = T2.a ORDER BY T1.a, T1.b,
T2.b");
+ expected =
+ Arrays.asList(
+ "1\t10\t10\t2000",
+ "1\t10\t20\t1200",
+ "1\t20\t10\t1200",
+ "1\t20\t20\t400",
+ "2\t40\t40\t800",
+ "3\t50\t50\t400");
+ Assert.assertEquals(expected, actual);
}
@Test
@@ -203,6 +251,29 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT b, sum(a), max(c) FROM " + tableName + " GROUP
BY b ORDER BY b");
expected = Arrays.asList("100\t30\tHi Again", "200\t40\tStore");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, b FROM (SELECT T1.a AS a, T1.b + T2.b AS b
FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.a = T2.a) T3 ORDER BY a, b");
+ expected = Arrays.asList("10\t200", "10\t300", "10\t300", "10\t400",
"20\t200", "30\t400");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT b, a FROM (SELECT T1.b AS b, T1.a + T2.a AS a
FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.b = T2.b) T3 ORDER BY b, a");
+ expected =
+ Arrays.asList(
+ "100\t20", "100\t30", "100\t30", "100\t40", "200\t20",
"200\t40", "200\t40",
+ "200\t60");
+ Assert.assertEquals(expected, actual);
}
@Test
@@ -262,6 +333,24 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
expected = Arrays.asList("100\t10", "200\t70", "400\t40", "1000\t10");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.b, T1.d, T2.d FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.b = T2.b ORDER BY T1.b, T1.d,
T2.d");
+ expected =
+ Arrays.asList(
+ "10\t100\t100",
+ "10\t100\t1000",
+ "10\t1000\t100",
+ "10\t1000\t1000",
+ "20\t200\t200",
+ "40\t400\t400",
+ "50\t200\t200");
+ Assert.assertEquals(expected, actual);
}
@Test
@@ -323,6 +412,24 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
expected = Arrays.asList("10\t400\tHello", "20\t400\tNULL",
"30\t500\tStore");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.b, T1.c, T2.c FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.b = T2.b ORDER BY T1.b, T1.c,
T2.c");
+ expected =
+ Arrays.asList(
+ "100\tHi\tHi",
+ "100\tHi\tHi Again",
+ "100\tHi Again\tHi",
+ "100\tHi Again\tHi Again",
+ "200\tHello\tHello",
+ "400\tNULL\tNULL",
+ "500\tStore\tStore");
+ Assert.assertEquals(expected, actual);
}
@Test
@@ -371,7 +478,9 @@ public class TableStoreHiveStorageHandlerITCase {
"World\t30");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d < 300 ORDER BY d");
+ actual =
+ hiveShell.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE d < 300 ORDER
BY b, d");
expected = Arrays.asList("1\t10\tHi\t100", "1\t20\tHello\t200",
"3\t50\tStore\t200");
Assert.assertEquals(expected, actual);
@@ -380,6 +489,16 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
expected = Arrays.asList("1\t1300", "2\t700", "3\t200");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.a, T1.b, T2.b FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.a = T2.a WHERE T1.a > 1 ORDER BY
T1.a, T1.b, T2.b");
+ expected = Arrays.asList("2\t30\t30", "2\t30\t40", "2\t40\t30",
"2\t40\t40", "3\t50\t50");
+ Assert.assertEquals(expected, actual);
}
@Test
@@ -450,6 +569,22 @@ public class TableStoreHiveStorageHandlerITCase {
"SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
expected = Arrays.asList("10\t400\tHello", "20\t700\tWorld",
"30\t500\tStore");
Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT T1.a, T1.b, T2.b FROM "
+ + tableName
+ + " T1 JOIN "
+ + tableName
+ + " T2 ON T1.a = T2.a WHERE T1.a > 10 ORDER BY
T1.a, T1.b, T2.b");
+ expected =
+ Arrays.asList(
+ "20\t300\t300",
+ "20\t300\t400",
+ "20\t400\t300",
+ "20\t400\t400",
+ "30\t500\t500");
+ Assert.assertEquals(expected, actual);
}
private String createChangelogExternalTable(