This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new adcf3974 [FLINK-28760] Enable table store support in Hive by creating
auxlib directory
adcf3974 is described below
commit adcf39744c7a2c7b3ec143d1d217eaa02efa10a6
Author: tsreaper <[email protected]>
AuthorDate: Tue Aug 2 10:21:38 2022 +0800
[FLINK-28760] Enable table store support in Hive by creating auxlib
directory
This closes #257
---
docs/content/docs/engines/hive.md | 15 ++-
flink-table-store-e2e-tests/pom.xml | 12 ++-
.../flink/table/store/tests/E2eTestBase.java | 33 +-----
.../flink/table/store/tests/HiveE2eTest.java | 113 ++++++++++++++-------
.../test/resources-filtered/docker-compose.yaml | 7 +-
5 files changed, 97 insertions(+), 83 deletions(-)
diff --git a/docs/content/docs/engines/hive.md
b/docs/content/docs/engines/hive.md
index 92a51578..3a8927ec 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -49,7 +49,10 @@ You are using an unreleased version of Table Store. See
[Build From Source]({{<
{{< /unstable >}}
-Copy table store Hive connector bundle jar to a path accessible by Hive, then
use `add jar /path/to/flink-table-store-hive-connector-{{< version >}}.jar` to
enable table store support in Hive.
+There are several ways to add this jar to Hive.
+
+* You can create an `auxlib` folder under the root directory of Hive, and copy
`flink-table-store-hive-connector-{{< version >}}.jar` into `auxlib`.
+* You can also copy this jar to a path accessible by Hive, then use `add jar
/path/to/flink-table-store-hive-connector-{{< version >}}.jar` to enable table
store support in Hive. Note that this method is not recommended. If you're
using the MR execution engine and running a join statement, you may be faced
with the exception `org.apache.hive.com.esotericsoftware.kryo.kryoexception:
unable to find class`.
## Using Table Store Hive Catalog
@@ -100,10 +103,7 @@ SELECT * FROM test_table;
Run the following Hive SQL in Hive CLI to access the created table.
```sql
--- Enable table store support in Hive
-
-ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar;
-
+-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already
in auxlib directory.
-- List tables in Hive
-- (you might need to switch to "default" database if you're not there by
default)
@@ -130,10 +130,7 @@ OK
To access existing table store table, you can also register them as external
tables in Hive. Run the following Hive SQL in Hive CLI.
```sql
--- Enable table store support in Hive
-
-ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar;
-
+-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already
in auxlib directory.
-- Let's use the test_table created in the above section.
-- To create an external table, you don't need to specify any column or table
properties.
-- Pointing the location to the path of table is enough.
diff --git a/flink-table-store-e2e-tests/pom.xml
b/flink-table-store-e2e-tests/pom.xml
index 85a62b5f..4be0beaf 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -31,6 +31,10 @@ under the License.
<artifactId>flink-table-store-e2e-tests</artifactId>
<name>Flink Table Store : End to End Tests</name>
+ <properties>
+ <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -100,7 +104,7 @@ under the License.
<destFileName>flink-table-store.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
-
<outputDirectory>${project.build.directory}/dependencies
+
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
</outputDirectory>
</artifactItem>
<artifactItem>
@@ -110,17 +114,17 @@ under the License.
<destFileName>flink-table-store-hive-connector.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
-
<outputDirectory>${project.build.directory}/dependencies
+
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
- <version>2.8.3-10.0</version>
+ <version>${flink.shaded.hadoop.version}</version>
<destFileName>bundled-hadoop.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
-
<outputDirectory>${project.build.directory}/dependencies
+
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
</outputDirectory>
</artifactItem>
</artifactItems>
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index be375f42..1cdf8da8 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.store.tests;
-import org.apache.flink.table.store.tests.utils.TestUtils;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
@@ -30,7 +28,6 @@ import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.MountableFile;
import java.io.File;
import java.util.ArrayList;
@@ -64,11 +61,6 @@ public abstract class E2eTestBase {
this.withHive = withHive;
}
- private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar";
- protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
- "flink-table-store-hive-connector.jar";
- private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar";
-
protected static final String TEST_DATA_DIR = "/test-data";
protected static final String HDFS_ROOT = "hdfs://namenode:8020";
@@ -121,10 +113,6 @@ public abstract class E2eTestBase {
environment.start();
jobManager =
environment.getContainerByServiceName("jobmanager_1").get();
jobManager.execInContainer("chown", "-R", "flink:flink",
TEST_DATA_DIR);
-
- copyResource(TABLE_STORE_JAR_NAME);
- copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME);
- copyResource(BUNDLED_HADOOP_JAR_NAME);
}
@AfterEach
@@ -134,12 +122,6 @@ public abstract class E2eTestBase {
}
}
- private void copyResource(String resourceName) {
- jobManager.copyFileToContainer(
-
MountableFile.forHostPath(TestUtils.getResource(resourceName).toString()),
- TEST_DATA_DIR + "/" + resourceName);
- }
-
protected void writeSharedFile(String filename, String content) throws
Exception {
if (content.length() == 0 || content.charAt(content.length() - 1) !=
'\n') {
content += "\n";
@@ -172,20 +154,7 @@ public abstract class E2eTestBase {
"su",
"flink",
"-c",
- "bin/sql-client.sh -f "
- + TEST_DATA_DIR
- + "/"
- + fileName
- // run with table store jar
- + " --jar "
- + TEST_DATA_DIR
- + "/"
- + TABLE_STORE_JAR_NAME
- // run with bundled hadoop jar
- + " --jar "
- + TEST_DATA_DIR
- + "/"
- + BUNDLED_HADOOP_JAR_NAME);
+ "bin/sql-client.sh -f " + TEST_DATA_DIR + "/" +
fileName);
LOG.info(execResult.getStdout());
LOG.info(execResult.getStderr());
if (execResult.getExitCode() != 0) {
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 9a4f7833..ef15d508 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.tests;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.ContainerState;
@@ -34,13 +35,26 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class HiveE2eTest extends E2eTestBase {
- private static final String ADD_JAR_HQL =
- "ADD JAR " + TEST_DATA_DIR + "/" +
TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";";
+ private static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
+ "flink-table-store-hive-connector.jar";
public HiveE2eTest() {
super(false, true);
}
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ super.before();
+ getHive()
+ .execInContainer(
+ "/bin/bash",
+ "-c",
+ "mkdir /opt/hive/auxlib && cp /jars/"
+ + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME
+ + " /opt/hive/auxlib");
+ }
+
@Test
public void testReadExternalTable() throws Exception {
String tableStorePkDdl =
@@ -58,6 +72,7 @@ public class HiveE2eTest extends E2eTestBase {
runSql(
"INSERT INTO table_store_pk VALUES "
+ "(1, 10, 'Hi'), "
+ + "(1, 100, 'Hi Again'), "
+ "(2, 20, 'Hello'), "
+ "(3, 30, 'Table'), "
+ "(4, 40, 'Store');",
@@ -68,28 +83,30 @@ public class HiveE2eTest extends E2eTestBase {
+ "STORED BY
'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n"
+ "LOCATION '"
+ tableStorePkPath
- +
"/default_catalog.catalog/default_database.db/table_store_pk';";
- writeSharedFile(
- "pk.hql",
- // same default database name as Flink
- ADD_JAR_HQL
- + "\n"
- + externalTablePkDdl
- + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;");
+ +
"/default_catalog.catalog/default_database.db/table_store_pk';\n";
- ContainerState hive = getHive();
- Container.ExecResult execResult =
- hive.execInContainer(
- "/opt/hive/bin/hive",
- "--hiveconf",
- "hive.root.logger=INFO,console",
- "-f",
- TEST_DATA_DIR + "/pk.hql");
- assertThat(execResult.getStdout())
- .isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n" + "30\t3\tTable\n"
+ "40\t4\tStore\n");
- if (execResult.getExitCode() != 0) {
- throw new AssertionError("Failed when running hive sql.");
- }
+ checkQueryResult(
+ externalTablePkDdl + "SELECT * FROM table_store_pk ORDER BY
b;",
+ "1\t10\tHi\n"
+ + "2\t20\tHello\n"
+ + "3\t30\tTable\n"
+ + "4\t40\tStore\n"
+ + "1\t100\tHi Again\n");
+ checkQueryResult(
+ externalTablePkDdl + "SELECT b, a FROM table_store_pk ORDER BY
b;",
+ "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n");
+ checkQueryResult(
+ externalTablePkDdl + "SELECT * FROM table_store_pk WHERE a > 1
ORDER BY b;",
+ "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n");
+ checkQueryResult(
+ externalTablePkDdl
+ + "SELECT a, SUM(b), MIN(c) FROM table_store_pk GROUP
BY a ORDER BY a;",
+ "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" +
"4\t40\tStore\n");
+ checkQueryResult(
+ externalTablePkDdl
+ + "SELECT T1.a, T1.b, T2.b FROM table_store_pk T1 JOIN
table_store_pk T2 "
+ + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b,
T2.b;",
+ "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" +
"1\t100\t100\n" + "2\t20\t20\n");
}
@Test
@@ -118,26 +135,50 @@ public class HiveE2eTest extends E2eTestBase {
" 'bucket' = '2'",
");",
"",
- "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20,
'Hello');");
+ "INSERT INTO T VALUES "
+ + "(1, 10, 'Hi'), "
+ + "(1, 100, 'Hi Again'), "
+ + "(2, 20, 'Hello'), "
+ + "(3, 30, 'Table'), "
+ + "(4, 40, 'Store');");
runSql(sql);
- writeSharedFile(
- "query.hql",
- // same default database name as Flink
- ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;");
+ checkQueryResult(
+ "SELECT * FROM t ORDER BY b;",
+ "1\t10\tHi\n"
+ + "2\t20\tHello\n"
+ + "3\t30\tTable\n"
+ + "4\t40\tStore\n"
+ + "1\t100\tHi Again\n");
+ checkQueryResult(
+ "SELECT b, a FROM t ORDER BY b;",
+ "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n");
+ checkQueryResult(
+ "SELECT * FROM t WHERE a > 1 ORDER BY b;",
+ "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n");
+ checkQueryResult(
+ "SELECT a, SUM(b), MIN(c) FROM t GROUP BY a ORDER BY a;",
+ "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" +
"4\t40\tStore\n");
+ checkQueryResult(
+ "SELECT T1.a, T1.b, T2.b FROM t T1 JOIN t T2 "
+ + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b,
T2.b;",
+ "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" +
"1\t100\t100\n" + "2\t20\t20\n");
+ }
- ContainerState hive = getHive();
+ private void checkQueryResult(String query, String expected) throws
Exception {
+ writeSharedFile("pk.hql", query);
Container.ExecResult execResult =
- hive.execInContainer(
- "/opt/hive/bin/hive",
- "--hiveconf",
- "hive.root.logger=INFO,console",
- "-f",
- TEST_DATA_DIR + "/query.hql");
- assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" +
"20\t2\tHello\n");
+ getHive()
+ .execInContainer(
+ "/opt/hive/bin/hive",
+ "--hiveconf",
+ "hive.root.logger=INFO,console",
+ "-f",
+ TEST_DATA_DIR + "/pk.hql");
if (execResult.getExitCode() != 0) {
throw new AssertionError("Failed when running hive sql.");
}
+ assertThat(execResult.getStdout()).isEqualTo(expected);
}
private ContainerState getHive() {
diff --git
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index d414d99f..0e273220 100644
---
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -28,7 +28,8 @@ services:
image: apache/flink:${flink.version}-java8
volumes:
- testdata:/test-data
- entrypoint: /bin/bash -c "wget -P /opt/flink/lib/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
&& /docker-entrypoint.sh jobmanager"
+ - /tmp/flink-table-store-e2e-tests-jars:/jars
+ entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar
/jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh jobmanager"
env_file:
- ./flink.env
networks:
@@ -42,7 +43,8 @@ services:
image: apache/flink:${flink.version}-java8
volumes:
- testdata:/test-data
- entrypoint: /bin/bash -c "wget -P /opt/flink/lib/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
&& /docker-entrypoint.sh taskmanager"
+ - /tmp/flink-table-store-e2e-tests-jars:/jars
+ entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar
/jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh taskmanager"
env_file:
- ./flink.env
networks:
@@ -130,6 +132,7 @@ services:
image: bde2020/hive:2.3.2-postgresql-metastore
volumes:
- testdata:/test-data
+ - /tmp/flink-table-store-e2e-tests-jars:/jars
networks:
testnetwork:
aliases: