This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 949415fd [FLINK-27542] Add end to end tests for Hive to read external
table store files
949415fd is described below
commit 949415fd03ae1caf71a85309eb58d38e02b4a5c9
Author: tsreaper <[email protected]>
AuthorDate: Fri Jun 17 15:29:07 2022 +0800
[FLINK-27542] Add end to end tests for Hive to read external table store
files
This closes #162
---
flink-table-store-e2e-tests/pom.xml | 19 ++++
.../flink/table/store/tests/E2eTestBase.java | 44 ++++++--
.../table/store/tests/FileStoreBatchE2eTest.java | 2 +-
.../table/store/tests/FileStoreStreamE2eTest.java | 4 +-
.../flink/table/store/tests/HiveE2eTest.java | 114 +++++++++++++++++++++
.../flink/table/store/tests/LogStoreE2eTest.java | 6 +-
.../test/resources-filtered/docker-compose.yaml | 79 ++++++++++++++
.../src/test/resources/flink.env | 2 +-
.../src/test/resources/hadoop-hive.env | 48 +++++++++
9 files changed, 300 insertions(+), 18 deletions(-)
diff --git a/flink-table-store-e2e-tests/pom.xml
b/flink-table-store-e2e-tests/pom.xml
index 92c6124f..1a533c82 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -38,6 +38,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- flink dependencies -->
<dependency>
@@ -93,6 +99,18 @@ under the License.
<version>${project.version}</version>
<destFileName>flink-table-store.jar</destFileName>
<type>jar</type>
+ <overWrite>true</overWrite>
+
<outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive</artifactId>
+ <version>${project.version}</version>
+
<destFileName>flink-table-store-hive.jar</destFileName>
+ <type>jar</type>
+ <classifier>jar-with-dependencies</classifier>
+ <overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
@@ -102,6 +120,7 @@ under the License.
<version>2.8.3-10.0</version>
<destFileName>bundled-hadoop.jar</destFileName>
<type>jar</type>
+ <overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index f79a4250..b973903c 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -29,6 +29,7 @@ import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;
import java.io.File;
@@ -52,25 +53,30 @@ public abstract class E2eTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(E2eTestBase.class);
private final boolean withKafka;
+ private final boolean withHive;
protected E2eTestBase() {
- this(false);
+ this(false, false);
}
- protected E2eTestBase(boolean withKafka) {
+ protected E2eTestBase(boolean withKafka, boolean withHive) {
this.withKafka = withKafka;
+ this.withHive = withHive;
}
private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar";
+ protected static final String TABLE_STORE_HIVE_JAR_NAME =
"flink-table-store-hive.jar";
private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar";
+
protected static final String TEST_DATA_DIR = "/test-data";
+ protected static final String HDFS_ROOT = "hdfs://namenode:8020";
private static final String PRINT_SINK_IDENTIFIER =
"table-store-e2e-result";
private static final int CHECK_RESULT_INTERVAL_MS = 1000;
private static final int CHECK_RESULT_RETRIES = 60;
private final List<String> currentResults = new ArrayList<>();
- private DockerComposeContainer<?> environment;
+ protected DockerComposeContainer<?> environment;
private ContainerState jobManager;
@BeforeEach
@@ -88,19 +94,35 @@ public abstract class E2eTestBase {
.withLogConsumer("jobmanager_1", new LogConsumer(LOG))
.withLogConsumer("taskmanager_1", new
LogConsumer(LOG));
if (withKafka) {
- services.add("zookeeper");
- services.add("kafka");
- environment
- .withLogConsumer("zookeeper_1", new Slf4jLogConsumer(LOG))
- .withLogConsumer("kafka_1", new Slf4jLogConsumer(LOG));
+ List<String> kafkaServices = Arrays.asList("zookeeper", "kafka");
+ services.addAll(kafkaServices);
+ for (String s : kafkaServices) {
+ environment.withLogConsumer(s + "_1", new
Slf4jLogConsumer(LOG));
+ }
+ }
+ if (withHive) {
+ List<String> hiveServices =
+ Arrays.asList(
+ "namenode",
+ "datanode",
+ "hive-server",
+ "hive-metastore",
+ "hive-metastore-postgresql");
+ services.addAll(hiveServices);
+ for (String s : hiveServices) {
+ environment.withLogConsumer(s + "_1", new
Slf4jLogConsumer(LOG));
+ }
+ environment.waitingFor(
+ "hive-server_1", Wait.forLogMessage(".*Starting
HiveServer2.*", 1));
}
- environment.withServices(services.toArray(new String[0]));
+ environment.withServices(services.toArray(new
String[0])).withLocalCompose(true);
environment.start();
jobManager =
environment.getContainerByServiceName("jobmanager_1").get();
jobManager.execInContainer("chown", "-R", "flink:flink",
TEST_DATA_DIR);
copyResource(TABLE_STORE_JAR_NAME);
+ copyResource(TABLE_STORE_HIVE_JAR_NAME);
copyResource(BUNDLED_HADOOP_JAR_NAME);
}
@@ -115,7 +137,7 @@ public abstract class E2eTestBase {
TEST_DATA_DIR + "/" + resourceName);
}
- protected void writeTestData(String filename, String content) throws
Exception {
+ protected void writeSharedFile(String filename, String content) throws
Exception {
if (content.length() == 0 || content.charAt(content.length() - 1) !=
'\n') {
content += "\n";
}
@@ -141,7 +163,7 @@ public abstract class E2eTestBase {
protected void runSql(String sql) throws Exception {
String fileName = UUID.randomUUID().toString() + ".sql";
- writeTestData(fileName, sql);
+ writeSharedFile(fileName, sql);
Container.ExecResult execResult =
jobManager.execInContainer(
"su",
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
index 047814c3..d7e177dc 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreBatchE2eTest.java
@@ -85,7 +85,7 @@ public class FileStoreBatchE2eTest extends E2eTestBase {
TEST_DATA_DIR + "/" + UUID.randomUUID().toString() +
".store");
// prepare test data
- writeTestData(testDataSourceFile, String.join("\n", data));
+ writeSharedFile(testDataSourceFile, String.join("\n", data));
// insert data into table store
runSql(
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
index 44009ecc..6a2fc91b 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FileStoreStreamE2eTest.java
@@ -56,7 +56,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
TEST_DATA_DIR + "/" + UUID.randomUUID().toString() +
".store");
// prepare first part of test data
- writeTestData(testDataSourceDir + "/1.csv", "A,5\nB,10\nA,4\nB,9\n");
+ writeSharedFile(testDataSourceDir + "/1.csv", "A,5\nB,10\nA,4\nB,9\n");
// insert data into table store
runSql(
@@ -77,7 +77,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
checkResult("A, 4, 1", "A, 5, 2", "B, 9, 1", "B, 10, 2");
// prepare second part of test data
- writeTestData(testDataSourceDir + "/2.csv", "A,3\nB,8\nA,2\nB,11\n");
+ writeSharedFile(testDataSourceDir + "/2.csv", "A,3\nB,8\nA,2\nB,11\n");
// check that we can read the second part of test data
checkResult("A, 2, 1", "A, 3, 2", "A, 4, 3", "B, 8, 1", "B, 9, 2", "B,
10, 3");
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
new file mode 100644
index 00000000..98ce0893
--- /dev/null
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.tests;
+
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.ContainerState;
+
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for reading table store from Hive.
+ *
+ * <p>NOTE: This test runs a complete Hadoop cluster in Docker, which requires
a lot of memory. If
+ * you're running this test locally, make sure that the memory limit of your
Docker is at least 8GB.
+ */
+public class HiveE2eTest extends E2eTestBase {
+
+ private static final String ADD_JAR_HQL =
+ "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_JAR_NAME + ";";
+
+ public HiveE2eTest() {
+ super(false, true);
+ }
+
+ @Test
+ public void testReadExternalTable() throws Exception {
+ // TODO write data directly to HDFS after FLINK-27562 is solved
+ String tableStorePkDdl =
+ "CREATE TABLE IF NOT EXISTS table_store_pk (\n"
+ + " a int,\n"
+ + " b bigint,\n"
+ + " c string,\n"
+ + " PRIMARY KEY (a, b) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '2',\n"
+ + " 'root-path' = '%s'\n"
+ + ");";
+ String tableStorePkPath = TEST_DATA_DIR + "/" +
UUID.randomUUID().toString() + ".store";
+ tableStorePkDdl = String.format(tableStorePkDdl, tableStorePkPath);
+ runSql(
+ "INSERT INTO table_store_pk VALUES "
+ + "(1, 10, 'Hi'), "
+ + "(2, 20, 'Hello'), "
+ + "(3, 30, 'Table'), "
+ + "(4, 40, 'Store');",
+ tableStorePkDdl);
+
+ String externalTablePkDdl =
+ "CREATE EXTERNAL TABLE IF NOT EXISTS table_store_pk\n"
+ + "STORED BY
'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n"
+ + "LOCATION '"
+ // hive cannot read from local path
+ + HDFS_ROOT
+ + tableStorePkPath
+ +
"/default_catalog.catalog/default_database.db/table_store_pk';";
+ writeSharedFile(
+ "pk.hql",
+ // same default database name as Flink
+ ADD_JAR_HQL
+ + "\n"
+ + externalTablePkDdl
+ + "\n"
+ + "SELECT b, a, c FROM table_store_pk ORDER BY b;");
+
+ ContainerState hive = getHive();
+ hive.execInContainer("hdfs", "dfs", "-mkdir", "-p", HDFS_ROOT +
TEST_DATA_DIR);
+ hive.execInContainer(
+ "hdfs", "dfs", "-copyFromLocal", tableStorePkPath, HDFS_ROOT +
tableStorePkPath);
+ Container.ExecResult execResult =
+ hive.execInContainer(
+ "/opt/hive/bin/hive",
+ "--hiveconf",
+ "hive.root.logger=INFO,console",
+ "-f",
+ TEST_DATA_DIR + "/pk.hql");
+ assertThat(execResult.getStdout())
+ .isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n" + "30\t3\tTable\n"
+ "40\t4\tStore\n");
+ if (execResult.getExitCode() != 0) {
+ throw new AssertionError("Failed when running hive sql.");
+ }
+ }
+
+ private ContainerState getHive() {
+ return environment.getContainerByServiceName("hive-server_1").get();
+ }
+
+ private void runSql(String sql, String... ddls) throws Exception {
+ runSql(
+ "SET 'execution.runtime-mode' = 'batch';\n"
+ + "SET 'table.dml-sync' = 'true';\n"
+ + String.join("\n", ddls)
+ + "\n"
+ + sql);
+ }
+}
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
index 465b6387..da385956 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java
@@ -26,7 +26,7 @@ import java.util.UUID;
public class LogStoreE2eTest extends E2eTestBase {
public LogStoreE2eTest() {
- super(true);
+ super(true, false);
}
@Test
@@ -81,7 +81,7 @@ public class LogStoreE2eTest extends E2eTestBase {
tableStoreStreamDdl, TEST_DATA_DIR + "/" +
tableStoreDir, "kafka:9092");
// prepare first part of test data
- writeTestData(testDataSourceDir + "/1.csv", "A,10\nC,30\nD,40\n");
+ writeSharedFile(testDataSourceDir + "/1.csv", "A,10\nC,30\nD,40\n");
// insert data into table store
runSql(
@@ -101,7 +101,7 @@ public class LogStoreE2eTest extends E2eTestBase {
checkResult(s -> s.split(",")[0], "A, 10", "B, 2", "C, 30", "D, 40");
// prepare second part of test data
- writeTestData(testDataSourceDir + "/2.csv", "A,100\nD,400\n");
+ writeSharedFile(testDataSourceDir + "/2.csv", "A,100\nD,400\n");
// check that we can receive data from log store quickly
checkResult(s -> s.split(",")[0], "A, 100", "B, 2", "C, 30", "D, 400");
diff --git
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index cfe4cccf..42c89b27 100644
---
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -90,8 +90,87 @@ services:
depends_on:
- zookeeper
+ # ----------------------------------------
+ # Hive services, copied and modified from
https://github.com/big-data-europe/docker-hive
+ # ----------------------------------------
+
+ namenode:
+ image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
+ volumes:
+ - testdata:/test-data
+ - namenode:/hadoop/dfs/name
+ networks:
+ testnetwork:
+ aliases:
+ - namenode
+ environment:
+ - CLUSTER_NAME=test
+ env_file:
+ - ./hadoop-hive.env
+ ports:
+ - "50070:50070"
+
+ datanode:
+ image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
+ volumes:
+ - testdata:/test-data
+ - datanode:/hadoop/dfs/data
+ networks:
+ testnetwork:
+ aliases:
+ - datanode
+ env_file:
+ - ./hadoop-hive.env
+ environment:
+ SERVICE_PRECONDITION: "namenode:50070"
+ ports:
+ - "50075:50075"
+
+ hive-server:
+ image: bde2020/hive:2.3.2-postgresql-metastore
+ volumes:
+ - testdata:/test-data
+ networks:
+ testnetwork:
+ aliases:
+ - hive-server
+ env_file:
+ - ./hadoop-hive.env
+ environment:
+ HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
"jdbc:postgresql://hive-metastore/metastore"
+ SERVICE_PRECONDITION: "hive-metastore:9083"
+ ports:
+ - "10000:10000"
+
+ hive-metastore:
+ image: bde2020/hive:2.3.2-postgresql-metastore
+ volumes:
+ - testdata:/test-data
+ networks:
+ testnetwork:
+ aliases:
+ - hive-metastore
+ env_file:
+ - ./hadoop-hive.env
+ command: /opt/hive/bin/hive --service metastore
+ environment:
+ SERVICE_PRECONDITION: "namenode:50070 datanode:50075
hive-metastore-postgresql:5432"
+ ports:
+ - "9083:9083"
+
+ hive-metastore-postgresql:
+ image: bde2020/hive-metastore-postgresql:2.3.0
+ volumes:
+ - testdata:/test-data
+ networks:
+ testnetwork:
+ aliases:
+ - hive-metastore-postgresql
+
volumes:
testdata:
+ namenode:
+ datanode:
networks:
testnetwork:
diff --git a/flink-table-store-e2e-tests/src/test/resources/flink.env
b/flink-table-store-e2e-tests/src/test/resources/flink.env
index d6042006..a78609f3 100644
--- a/flink-table-store-e2e-tests/src/test/resources/flink.env
+++ b/flink-table-store-e2e-tests/src/test/resources/flink.env
@@ -16,4 +16,4 @@
# limitations under the License.
################################################################################
-FLINK_PROPERTIES="jobmanager.rpc.address:
jobmanager\ntaskmanager.numberOfTaskSlots: 9\nparallelism.default:
3\nsql-client.execution.result-mode: TABLEAU"
+FLINK_PROPERTIES="jobmanager.rpc.address:
jobmanager\ntaskmanager.numberOfTaskSlots: 9\nparallelism.default:
3\nsql-client.execution.result-mode: TABLEAU\nenv.java.opts.taskmanager:
-verbose:gc"
diff --git a/flink-table-store-e2e-tests/src/test/resources/hadoop-hive.env
b/flink-table-store-e2e-tests/src/test/resources/hadoop-hive.env
new file mode 100644
index 00000000..a78e65a1
--- /dev/null
+++ b/flink-table-store-e2e-tests/src/test/resources/hadoop-hive.env
@@ -0,0 +1,48 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+HIVE_SITE_CONF_javax_jdo_option_ConnectionURL=jdbc:postgresql://hive-metastore-postgresql/metastore
+HIVE_SITE_CONF_javax_jdo_option_ConnectionDriverName=org.postgresql.Driver
+HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
+HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
+HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
+HIVE_SITE_CONF_hive_metastore_uris=thrift://hive-metastore:9083
+HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
+
+CORE_CONF_fs_defaultFS=hdfs://namenode:8020
+CORE_CONF_hadoop_http_staticuser_user=root
+CORE_CONF_hadoop_proxyuser_hue_hosts=*
+CORE_CONF_hadoop_proxyuser_hue_groups=*
+
+HDFS_CONF_dfs_webhdfs_enabled=true
+HDFS_CONF_dfs_permissions_enabled=false
+
+YARN_CONF_yarn_log___aggregation___enable=true
+YARN_CONF_yarn_resourcemanager_recovery_enabled=true
+YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
+YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
+YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
+YARN_CONF_yarn_timeline___service_enabled=true
+YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
+YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
+YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
+YARN_CONF_yarn_timeline___service_hostname=historyserver
+YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
+YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
+YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031