This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 870c569fa [#4981] Improvement(test): Add integration tests for Hive
catalog with S3 location. (#4982)
870c569fa is described below
commit 870c569fa2209aa68036ecb94bb400ea1bdce7c9
Author: Qi Yu <[email protected]>
AuthorDate: Tue Sep 24 09:54:59 2024 +0800
[#4981] Improvement(test): Add integration tests for Hive catalog with S3
location. (#4982)
### What changes were proposed in this pull request?
Add integration tests to test hive catalog with S3 location.
### Why are the changes needed?
To make code more robust.
Fix: #4981
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
The changes itself is tested.
---
build.gradle.kts | 3 +-
catalogs/catalog-hive/build.gradle.kts | 2 +
.../hive/integration/test/CatalogHiveIT.java | 206 ++++++++++++---------
.../hive/integration/test/CatalogHiveS3IT.java | 171 +++++++++++++++++
gradle/libs.versions.toml | 2 +
.../integration/test/container/ContainerSuite.java | 75 +++++++-
.../container/GravitinoLocalStackContainer.java | 69 +++++++
7 files changed, 431 insertions(+), 97 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index e18b5c56c..c6ea7f13c 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -168,12 +168,13 @@ allprojects {
param.environment("PROJECT_VERSION", project.version)
// Gravitino CI Docker image
- param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE",
"apache/gravitino-ci:hive-0.1.13")
+ param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE",
"apache/gravitino-ci:hive-0.1.14")
param.environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE",
"apache/gravitino-ci:kerberos-hive-0.1.5")
param.environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE",
"apache/gravitino-ci:doris-0.1.5")
param.environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE",
"apache/gravitino-ci:trino-0.1.6")
param.environment("GRAVITINO_CI_RANGER_DOCKER_IMAGE",
"apache/gravitino-ci:ranger-0.1.1")
param.environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE",
"apache/kafka:3.7.0")
+ param.environment("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE",
"localstack/localstack:latest")
val dockerRunning = project.rootProject.extra["dockerRunning"] as?
Boolean ?: false
val macDockerConnector = project.rootProject.extra["macDockerConnector"]
as? Boolean ?: false
diff --git a/catalogs/catalog-hive/build.gradle.kts
b/catalogs/catalog-hive/build.gradle.kts
index 82b213a2a..2afb48f9a 100644
--- a/catalogs/catalog-hive/build.gradle.kts
+++ b/catalogs/catalog-hive/build.gradle.kts
@@ -127,6 +127,8 @@ dependencies {
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
+ testImplementation(libs.testcontainers.localstack)
+ testImplementation(libs.hadoop2.s3)
testRuntimeOnly(libs.junit.jupiter.engine)
}
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index c9ef0db1f..081aad480 100644
---
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -23,12 +23,9 @@ import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.COMM
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.EXTERNAL;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.FORMAT;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.INPUT_FORMAT;
-import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.LOCATION;
-import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.NUM_FILES;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.OUTPUT_FORMAT;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.SERDE_LIB;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TABLE_TYPE;
-import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TOTAL_SIZE;
import static
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TRANSIENT_LAST_DDL_TIME;
import static org.apache.gravitino.catalog.hive.TableType.EXTERNAL_TABLE;
import static org.apache.gravitino.catalog.hive.TableType.MANAGED_TABLE;
@@ -45,6 +42,7 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -110,33 +108,35 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Tag("gravitino-docker-test")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CatalogHiveIT extends AbstractIT {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogHiveIT.class);
public static final String metalakeName =
GravitinoITUtils.genRandomName("CatalogHiveIT_metalake");
- public static final String catalogName =
GravitinoITUtils.genRandomName("CatalogHiveIT_catalog");
- public static final String SCHEMA_PREFIX = "CatalogHiveIT_schema";
- public static final String schemaName =
GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
- public static final String TABLE_PREFIX = "CatalogHiveIT_table";
- public static final String tableName =
GravitinoITUtils.genRandomName(TABLE_PREFIX);
+ public String catalogName =
GravitinoITUtils.genRandomName("CatalogHiveIT_catalog");
+ public String SCHEMA_PREFIX = "CatalogHiveIT_schema";
+ public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ public String TABLE_PREFIX = "CatalogHiveIT_table";
+ public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
public static final String ALTER_TABLE_NAME = "alert_table_name";
public static final String TABLE_COMMENT = "table_comment";
public static final String HIVE_COL_NAME1 = "hive_col_name1";
public static final String HIVE_COL_NAME2 = "hive_col_name2";
public static final String HIVE_COL_NAME3 = "hive_col_name3";
- private static String HIVE_METASTORE_URIS;
- private static final String provider = "hive";
- private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
- private static HiveClientPool hiveClientPool;
- private static GravitinoMetalake metalake;
- private static Catalog catalog;
- private static SparkSession sparkSession;
- private static FileSystem hdfs;
- private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM %s.%s";
+ protected String HIVE_METASTORE_URIS;
+ protected final String provider = "hive";
+ protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+ private HiveClientPool hiveClientPool;
+ protected GravitinoMetalake metalake;
+ protected Catalog catalog;
+ protected SparkSession sparkSession;
+ protected FileSystem fileSystem;
+ private final String SELECT_ALL_TEMPLATE = "SELECT * FROM %s.%s";
private static String getInsertWithoutPartitionSql(
String dbName, String tableName, String values) {
@@ -161,8 +161,7 @@ public class CatalogHiveIT extends AbstractIT {
STRING_TYPE_NAME,
"'gravitino_it_test'");
- @BeforeAll
- public static void startup() throws Exception {
+ protected void startNecessaryContainer() {
containerSuite.startHiveContainer();
HIVE_METASTORE_URIS =
@@ -170,15 +169,9 @@ public class CatalogHiveIT extends AbstractIT {
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
+ }
- HiveConf hiveConf = new HiveConf();
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_METASTORE_URIS);
-
- // Check if Hive client can connect to Hive metastore
- hiveClientPool = new HiveClientPool(1, hiveConf);
- List<String> dbs = hiveClientPool.run(client -> client.getAllDatabases());
- Assertions.assertFalse(dbs.isEmpty());
-
+ protected void initSparkSession() {
sparkSession =
SparkSession.builder()
.master("local[1]")
@@ -194,7 +187,9 @@ public class CatalogHiveIT extends AbstractIT {
.config("mapreduce.input.fileinputformat.input.dir.recursive",
"true")
.enableHiveSupport()
.getOrCreate();
+ }
+ protected void initFileSystem() throws IOException {
Configuration conf = new Configuration();
conf.set(
"fs.defaultFS",
@@ -202,7 +197,23 @@ public class CatalogHiveIT extends AbstractIT {
"hdfs://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT));
- hdfs = FileSystem.get(conf);
+ fileSystem = FileSystem.get(conf);
+ }
+
+ @BeforeAll
+ public void startup() throws Exception {
+ startNecessaryContainer();
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_METASTORE_URIS);
+
+ // Check if Hive client can connect to Hive metastore
+ hiveClientPool = new HiveClientPool(1, hiveConf);
+ List<String> dbs = hiveClientPool.run(client -> client.getAllDatabases());
+ Assertions.assertFalse(dbs.isEmpty());
+
+ initSparkSession();
+ initFileSystem();
createMetalake();
createCatalog();
@@ -210,7 +221,7 @@ public class CatalogHiveIT extends AbstractIT {
}
@AfterAll
- public static void stop() throws IOException {
+ public void stop() throws IOException {
if (client != null) {
Arrays.stream(catalog.asSchemas().listSchemas())
.filter(schema -> !schema.equals("default"))
@@ -233,8 +244,8 @@ public class CatalogHiveIT extends AbstractIT {
sparkSession.close();
}
- if (hdfs != null) {
- hdfs.close();
+ if (fileSystem != null) {
+ fileSystem.close();
}
try {
closer.close();
@@ -254,7 +265,7 @@ public class CatalogHiveIT extends AbstractIT {
createSchema();
}
- private static void createMetalake() {
+ private void createMetalake() {
GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
Assertions.assertEquals(0, gravitinoMetalakes.length);
@@ -266,7 +277,7 @@ public class CatalogHiveIT extends AbstractIT {
metalake = loadMetalake;
}
- private static void createCatalog() {
+ protected void createCatalog() {
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
@@ -275,20 +286,10 @@ public class CatalogHiveIT extends AbstractIT {
catalog = metalake.loadCatalog(catalogName);
}
- private static void createSchema() throws TException, InterruptedException {
- Map<String, String> properties = Maps.newHashMap();
- properties.put("key1", "val1");
- properties.put("key2", "val2");
- properties.put(
- "location",
- String.format(
- "hdfs://%s:%d/user/hive/warehouse/%s.db",
- containerSuite.getHiveContainer().getContainerIpAddress(),
- HiveContainer.HDFS_DEFAULTFS_PORT,
- schemaName.toLowerCase()));
+ private void createSchema() throws TException, InterruptedException {
+ Map<String, String> schemaProperties = createSchemaProperties();
String comment = "comment";
-
- catalog.asSchemas().createSchema(schemaName, comment, properties);
+ catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
Assertions.assertEquals(schemaName.toLowerCase(), loadSchema.name());
Assertions.assertEquals(comment, loadSchema.comment());
@@ -335,7 +336,7 @@ public class CatalogHiveIT extends AbstractIT {
Path tableDirectory = new Path(table.getSd().getLocation());
FileStatus[] fileStatuses;
try {
- fileStatuses = hdfs.listStatus(tableDirectory);
+ fileStatuses = fileSystem.listStatus(tableDirectory);
} catch (IOException e) {
LOG.warn("Failed to list status of table directory", e);
throw new RuntimeException(e);
@@ -346,7 +347,7 @@ public class CatalogHiveIT extends AbstractIT {
}
}
- private Map<String, String> createProperties() {
+ protected Map<String, String> createProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
@@ -560,7 +561,7 @@ public class CatalogHiveIT extends AbstractIT {
catalog
.asTableCatalog()
.createTable(
- nameIdentifier, columns, TABLE_COMMENT, ImmutableMap.of(),
Transforms.EMPTY_TRANSFORM);
+ nameIdentifier, columns, TABLE_COMMENT, createProperties(),
Transforms.EMPTY_TRANSFORM);
Table loadedTable1 = catalog.asTableCatalog().loadTable(nameIdentifier);
HiveTablePropertiesMetadata tablePropertiesMetadata = new
HiveTablePropertiesMetadata();
org.apache.hadoop.hive.metastore.api.Table actualTable =
@@ -569,6 +570,10 @@ public class CatalogHiveIT extends AbstractIT {
checkTableReadWrite(actualTable);
// test set properties
+ Map<String, String> properties = createProperties();
+ properties.put(FORMAT, "textfile");
+ properties.put(SERDE_LIB, HiveStorageConstants.OPENCSV_SERDE_CLASS);
+ properties.put(TABLE_TYPE, "external_table");
String table2 = GravitinoITUtils.genRandomName(TABLE_PREFIX);
catalog
.asTableCatalog()
@@ -576,18 +581,7 @@ public class CatalogHiveIT extends AbstractIT {
NameIdentifier.of(schemaName, table2),
columns,
TABLE_COMMENT,
- ImmutableMap.of(
- TABLE_TYPE,
- "external_table",
- LOCATION,
- String.format(
- "hdfs://%s:%d/tmp",
- containerSuite.getHiveContainer().getContainerIpAddress(),
- HiveContainer.HDFS_DEFAULTFS_PORT),
- FORMAT,
- "textfile",
- SERDE_LIB,
- HiveStorageConstants.OPENCSV_SERDE_CLASS),
+ properties,
Transforms.EMPTY_TRANSFORM);
Table loadedTable2 =
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, table2));
org.apache.hadoop.hive.metastore.api.Table actualTable2 =
@@ -607,10 +601,9 @@ public class CatalogHiveIT extends AbstractIT {
Assertions.assertEquals(
((Boolean)
tablePropertiesMetadata.getDefaultValue(EXTERNAL)).toString().toUpperCase(),
actualTable.getParameters().get(EXTERNAL));
- Assertions.assertTrue(actualTable2.getSd().getLocation().endsWith("/tmp"));
Assertions.assertNotNull(loadedTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
- Assertions.assertNotNull(loadedTable2.properties().get(NUM_FILES));
- Assertions.assertNotNull(loadedTable2.properties().get(TOTAL_SIZE));
+
+ // S3 doesn't support NUM_FILES and TOTAL_SIZE
checkTableReadWrite(actualTable2);
// test alter properties exception
@@ -630,36 +623,42 @@ public class CatalogHiveIT extends AbstractIT {
public void testHiveSchemaProperties() throws TException,
InterruptedException {
// test LOCATION property
String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
- Map<String, String> properties = Maps.newHashMap();
- String expectedSchemaLocation =
- String.format(
- "hdfs://%s:%d/tmp",
- containerSuite.getHiveContainer().getContainerIpAddress(),
- HiveContainer.HDFS_DEFAULTFS_PORT);
- properties.put(HiveSchemaPropertiesMetadata.LOCATION,
expectedSchemaLocation);
- catalog.asSchemas().createSchema(schemaName, "comment", properties);
+ Map<String, String> schemaProperties = createSchemaProperties();
+ String expectedHDFSSchemaLocation =
schemaProperties.get(HiveSchemaPropertiesMetadata.LOCATION);
+
+ catalog.asSchemas().createSchema(schemaName, "comment", schemaProperties);
Database actualSchema = hiveClientPool.run(client ->
client.getDatabase(schemaName));
String actualSchemaLocation = actualSchema.getLocationUri();
-
Assertions.assertTrue(actualSchemaLocation.endsWith(expectedSchemaLocation));
+
Assertions.assertTrue(actualSchemaLocation.endsWith(expectedHDFSSchemaLocation));
NameIdentifier tableIdent =
NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName(TABLE_PREFIX));
+
+ Map<String, String> tableProperties = createProperties();
+ String expectedSchemaLocation =
+ tableProperties.getOrDefault(
+ HiveSchemaPropertiesMetadata.LOCATION, expectedHDFSSchemaLocation);
+
catalog
.asTableCatalog()
.createTable(
tableIdent,
createColumns(),
TABLE_COMMENT,
- ImmutableMap.of(),
+ tableProperties,
Transforms.EMPTY_TRANSFORM);
org.apache.hadoop.hive.metastore.api.Table actualTable =
hiveClientPool.run(client -> client.getTable(schemaName,
tableIdent.name()));
String actualTableLocation = actualTable.getSd().getLocation();
// use `tableIdent.name().toLowerCase()` because HMS will convert table
name to lower
- String expectedTableLocation = expectedSchemaLocation + "/" +
tableIdent.name().toLowerCase();
- Assertions.assertTrue(actualTableLocation.endsWith(expectedTableLocation));
+
+ // actualTable.getSd().getLocation() is null for S3
+ if (!tableProperties.containsKey(HiveTablePropertiesMetadata.LOCATION)) {
+ String expectedTableLocation = expectedSchemaLocation + "/" +
tableIdent.name().toLowerCase();
+
Assertions.assertTrue(actualTableLocation.endsWith(expectedTableLocation));
+ }
checkTableReadWrite(actualTable);
}
@@ -901,7 +900,7 @@ public class CatalogHiveIT extends AbstractIT {
hiveClientPool.run(client -> client.getTable(schemaName,
createdTable.name()));
Path partitionDirectory = new Path(hiveTab.getSd().getLocation() +
identity.name());
Assertions.assertFalse(
- hdfs.exists(partitionDirectory), "The partition directory should not
exist");
+ fileSystem.exists(partitionDirectory), "The partition directory should
not exist");
// add partition
"hive_col_name2=2024-01-02/hive_col_name3=gravitino_it_test2"
String[] field3 = new String[] {"hive_col_name2"};
@@ -953,7 +952,7 @@ public class CatalogHiveIT extends AbstractIT {
client.getPartition(schemaName, createdTable.name(),
partitionAdded1.name())));
Path partitionDirectory1 = new Path(hiveTab.getSd().getLocation() +
identity1.name());
Assertions.assertFalse(
- hdfs.exists(partitionDirectory1), "The partition directory should not
exist");
+ fileSystem.exists(partitionDirectory1), "The partition directory
should not exist");
Assertions.assertThrows(
NoSuchObjectException.class,
() ->
@@ -962,7 +961,7 @@ public class CatalogHiveIT extends AbstractIT {
client.getPartition(schemaName, createdTable.name(),
partitionAdded2.name())));
Path partitionDirectory2 = new Path(hiveTab.getSd().getLocation() +
identity2.name());
Assertions.assertFalse(
- hdfs.exists(partitionDirectory2), "The partition directory should not
exist");
+ fileSystem.exists(partitionDirectory2), "The partition directory
should not exist");
// test no-exist partition with ifExist=false
Assertions.assertFalse(createdTable.supportPartitions().dropPartition(partitionAdded.name()));
@@ -1388,7 +1387,7 @@ public class CatalogHiveIT extends AbstractIT {
// Schema does not have the rename operation.
final String schemaName =
GravitinoITUtils.genRandomName("CatalogHiveIT_schema");
- catalog.asSchemas().createSchema(schemaName, "", ImmutableMap.of());
+ catalog.asSchemas().createSchema(schemaName, "", createSchemaProperties());
final Catalog cata = catalog;
// Now try to rename table
@@ -1472,19 +1471,23 @@ public class CatalogHiveIT extends AbstractIT {
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName,
tableName));
Boolean existed = hiveClientPool.run(client ->
client.tableExists(schemaName, tableName));
Assertions.assertFalse(existed, "The Hive table should not exist");
- Assertions.assertFalse(hdfs.exists(tableDirectory), "The table directory
should not exist");
+ Assertions.assertFalse(
+ fileSystem.exists(tableDirectory), "The table directory should not
exist");
}
@Test
public void testDropHiveExternalTable() throws TException,
InterruptedException, IOException {
Column[] columns = createColumns();
+ Map<String, String> properties = createProperties();
+ properties.put(TABLE_TYPE, EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT));
+
catalog
.asTableCatalog()
.createTable(
NameIdentifier.of(schemaName, tableName),
columns,
TABLE_COMMENT,
- ImmutableMap.of(TABLE_TYPE,
EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT)),
+ properties,
new Transform[] {Transforms.identity(columns[2].name())});
// Directly get table from Hive metastore to check if the table is created
successfully.
org.apache.hadoop.hive.metastore.api.Table hiveTab =
@@ -1497,7 +1500,7 @@ public class CatalogHiveIT extends AbstractIT {
Assertions.assertFalse(existed, "The table should be not exist");
Path tableDirectory = new Path(hiveTab.getSd().getLocation());
Assertions.assertTrue(
- hdfs.listStatus(tableDirectory).length > 0, "The table should not be
empty");
+ fileSystem.listStatus(tableDirectory).length > 0, "The table should
not be empty");
}
@Test
@@ -1525,21 +1528,25 @@ public class CatalogHiveIT extends AbstractIT {
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName,
tableName)),
"The table should not be found in the catalog");
Path tableDirectory = new Path(hiveTab.getSd().getLocation());
- Assertions.assertFalse(hdfs.exists(tableDirectory), "The table directory
should not exist");
- Path trashDirectory = hdfs.getTrashRoot(tableDirectory);
- Assertions.assertFalse(hdfs.exists(trashDirectory), "The trash should not
exist");
+ Assertions.assertFalse(
+ fileSystem.exists(tableDirectory), "The table directory should not
exist");
+ Path trashDirectory = fileSystem.getTrashRoot(tableDirectory);
+ Assertions.assertFalse(fileSystem.exists(trashDirectory), "The trash
should not exist");
}
@Test
public void testPurgeHiveExternalTable() throws TException,
InterruptedException, IOException {
Column[] columns = createColumns();
+ Map<String, String> properties = createProperties();
+ properties.put(TABLE_TYPE, EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT));
+
catalog
.asTableCatalog()
.createTable(
NameIdentifier.of(schemaName, tableName),
columns,
TABLE_COMMENT,
- ImmutableMap.of(TABLE_TYPE,
EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT)),
+ properties,
new Transform[] {Transforms.identity(columns[2].name())});
// Directly get table from Hive metastore to check if the table is created
successfully.
@@ -1560,7 +1567,7 @@ public class CatalogHiveIT extends AbstractIT {
Assertions.assertTrue(existed, "The table should be still exist");
Path tableDirectory = new Path(hiveTab.getSd().getLocation());
Assertions.assertTrue(
- hdfs.listStatus(tableDirectory).length > 0, "The table should not be
empty");
+ fileSystem.listStatus(tableDirectory).length > 0, "The table should
not be empty");
}
@Test
@@ -1649,7 +1656,10 @@ public class CatalogHiveIT extends AbstractIT {
Exception exception =
Assertions.assertThrows(
Exception.class,
- () -> createdCatalog.asSchemas().createSchema("schema", "comment",
ImmutableMap.of()));
+ () ->
+ createdCatalog
+ .asSchemas()
+ .createSchema("schema", "comment",
createSchemaProperties()));
Assertions.assertTrue(exception.getMessage().contains("Failed to connect
to Hive Metastore"));
Catalog newCatalog =
@@ -1660,14 +1670,14 @@ public class CatalogHiveIT extends AbstractIT {
// The URI has restored, so it should not throw exception.
Assertions.assertDoesNotThrow(
() -> {
- newCatalog.asSchemas().createSchema("schema", "comment",
ImmutableMap.of());
+ newCatalog.asSchemas().createSchema("schema", "comment",
createSchemaProperties());
});
newCatalog.asSchemas().dropSchema("schema", true);
metalake.dropCatalog(nameOfCatalog);
}
- private static void createCatalogWithCustomOperation(String catalogName,
String customImpl) {
+ private void createCatalogWithCustomOperation(String catalogName, String
customImpl) {
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl);
@@ -1677,4 +1687,18 @@ public class CatalogHiveIT extends AbstractIT {
catalogName, Catalog.Type.RELATIONAL, provider, "comment",
properties);
catalog.asSchemas().listSchemas();
}
+
+ protected Map<String, String> createSchemaProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+ properties.put(
+ "location",
+ String.format(
+ "hdfs://%s:%d/user/hive/warehouse/%s.db",
+ containerSuite.getHiveContainer().getContainerIpAddress(),
+ HiveContainer.HDFS_DEFAULTFS_PORT,
+ schemaName.toLowerCase()));
+ return properties;
+ }
}
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
new file mode 100644
index 000000000..83d52d241
--- /dev/null
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive.integration.test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class CatalogHiveS3IT extends CatalogHiveIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CatalogHiveS3IT.class);
+
+ private static final String S3_BUCKET_NAME = "my-test-bucket";
+ private GravitinoLocalStackContainer gravitinoLocalStackContainer;
+
+ private static final String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+ private static final String S3_SECRET_KEY = "S3_SECRET_KEY";
+ private static final String S3_ENDPOINT = "S3_ENDPOINT";
+
+ private String getS3Endpoint;
+ private String accessKey;
+ private String secretKey;
+
+ @Override
+ protected void startNecessaryContainer() {
+ containerSuite.startLocalStackContainer();
+ gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();
+
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try {
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-user", "--user-name",
"anonymous");
+ return result.getExitCode() == 0;
+ } catch (Exception e) {
+ LOGGER.info("LocalStack is not ready yet for: ", e);
+ return false;
+ }
+ });
+
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "s3", "mb", "s3://" + S3_BUCKET_NAME);
+
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-access-key", "--user-name",
"anonymous");
+
+ // Get access key and secret key from result
+ String[] lines = result.getStdout().split("\n");
+ accessKey = lines[3].split(":")[1].trim().substring(1, 21);
+ secretKey = lines[5].split(":")[1].trim().substring(1, 41);
+
+ LOGGER.info("Access key: " + accessKey);
+ LOGGER.info("Secret key: " + secretKey);
+
+ getS3Endpoint =
+ String.format("http://%s:%d",
gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
+
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal",
+ "s3api",
+ "put-bucket-acl",
+ "--bucket",
+ "my-test-bucket",
+ "--acl",
+ "public-read-write");
+
+ Map<String, String> hiveContainerEnv =
+ ImmutableMap.of(
+ S3_ACCESS_KEY,
+ accessKey,
+ S3_SECRET_KEY,
+ secretKey,
+ S3_ENDPOINT,
+ getS3Endpoint,
+ HiveContainer.HIVE_RUNTIME_VERSION,
+ HiveContainer.HIVE3);
+
+ containerSuite.startHiveContainerWithS3(hiveContainerEnv);
+
+ HIVE_METASTORE_URIS =
+ String.format(
+ "thrift://%s:%d",
+ containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
+ HiveContainer.HIVE_METASTORE_PORT);
+ }
+
+ @Override
+ protected void initFileSystem() throws IOException {
+ // Use S3a file system
+ Configuration conf = new Configuration();
+ conf.set("fs.s3a.access.key", accessKey);
+ conf.set("fs.s3a.secret.key", secretKey);
+ conf.set("fs.s3a.endpoint", getS3Endpoint);
+ conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ conf.set(
+ "fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ conf.set("fs.s3a.path.style.access", "true");
+ conf.set("fs.s3a.connection.ssl.enabled", "false");
+ fileSystem = FileSystem.get(URI.create("s3a://" + S3_BUCKET_NAME), conf);
+ }
+
+ @Override
+ protected void initSparkSession() {
+ sparkSession =
+ SparkSession.builder()
+ .master("local[1]")
+ .appName("Hive Catalog integration test")
+ .config("hive.metastore.uris", HIVE_METASTORE_URIS)
+ .config(
+ "spark.sql.warehouse.dir",
+ String.format(
+ "hdfs://%s:%d/user/hive/warehouse",
+
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
+ HiveContainer.HDFS_DEFAULTFS_PORT))
+ .config("spark.hadoop.fs.s3a.access.key", accessKey)
+ .config("spark.hadoop.fs.s3a.secret.key", secretKey)
+ .config("spark.hadoop.fs.s3a.endpoint", getS3Endpoint)
+ .config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+ .config("spark.hadoop.fs.s3a.path.style.access", "true")
+ .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+ .config(
+ "spark.hadoop.fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+ .config("spark.sql.storeAssignmentPolicy", "LEGACY")
+ .config("mapreduce.input.fileinputformat.input.dir.recursive",
"true")
+ .enableHiveSupport()
+ .getOrCreate();
+ }
+
+ @Override
+ protected Map<String, String> createSchemaProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "val1");
+ properties.put("key2", "val2");
+ properties.put("location", "s3a://" + S3_BUCKET_NAME + "/test-" +
System.currentTimeMillis());
+ return properties;
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 6790962cf..255306c98 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -146,6 +146,7 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name =
"hadoop-hdfs", version.ref
hadoop2-hdfs-client = { group = "org.apache.hadoop", name =
"hadoop-hdfs-client", version.ref = "hadoop2" }
hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common",
version.ref = "hadoop2"}
hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name =
"hadoop-mapreduce-client-core", version.ref = "hadoop2"}
+hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref =
"hadoop2"}
hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs",
version.ref = "hadoop3" }
hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common",
version.ref = "hadoop3"}
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client",
version.ref = "hadoop3"}
@@ -183,6 +184,7 @@ testcontainers = { group = "org.testcontainers", name =
"testcontainers", versio
testcontainers-mysql = { group = "org.testcontainers", name = "mysql",
version.ref = "testcontainers" }
testcontainers-postgresql = { group = "org.testcontainers", name =
"postgresql", version.ref = "testcontainers" }
testcontainers-junit-jupiter = { group = "org.testcontainers", name =
"junit-jupiter", version.ref = "testcontainers" }
+testcontainers-localstack = { group = "org.testcontainers", name =
"localstack", version.ref = "testcontainers" }
trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino" }
jwt-api = { group = "io.jsonwebtoken", name = "jjwt-api", version.ref = "jwt"}
jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref =
"jwt"}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index bc311c4bc..12a88bbd9 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -71,6 +71,16 @@ public class ContainerSuite implements Closeable {
private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap
=
new EnumMap<>(PGImageName.class);
+ private static volatile GravitinoLocalStackContainer
gravitinoLocalStackContainer;
+
+ /**
+ * We can share the same Hive container as Hive container with S3 contains
the following
+ * differences: 1. Configuration of S3 and corresponding environment
variables 2. The Hive
+ * container with S3 is Hive3 container and the Hive container is Hive2
container. There is
+ * something wrong with the hive2 container to access the S3.
+ */
+ private static volatile HiveContainer hiveContainerWithS3;
+
protected static final CloseableGroup closer = CloseableGroup.create();
private static void initIfNecessary() {
@@ -112,7 +122,11 @@ public class ContainerSuite implements Closeable {
return network;
}
- public void startHiveContainer() {
+ public void startHiveContainer(Map<String, String> env) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.putAll(env);
+ builder.put("HADOOP_USER_NAME", "anonymous");
+
if (hiveContainer == null) {
synchronized (ContainerSuite.class) {
if (hiveContainer == null) {
@@ -121,10 +135,7 @@ public class ContainerSuite implements Closeable {
HiveContainer.Builder hiveBuilder =
HiveContainer.builder()
.withHostName("gravitino-ci-hive")
- .withEnvVars(
- ImmutableMap.<String, String>builder()
- .put("HADOOP_USER_NAME", "anonymous")
- .build())
+ .withEnvVars(builder.build())
.withNetwork(network);
HiveContainer container = closer.register(hiveBuilder.build());
container.start();
@@ -134,6 +145,33 @@ public class ContainerSuite implements Closeable {
}
}
+ public void startHiveContainerWithS3(Map<String, String> env) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.putAll(env);
+ builder.put("HADOOP_USER_NAME", "anonymous");
+
+ if (hiveContainerWithS3 == null) {
+ synchronized (ContainerSuite.class) {
+ if (hiveContainerWithS3 == null) {
+ initIfNecessary();
+ // Start Hive container
+ HiveContainer.Builder hiveBuilder =
+ HiveContainer.builder()
+ .withHostName("gravitino-ci-hive")
+ .withEnvVars(builder.build())
+ .withNetwork(network);
+ HiveContainer container = closer.register(hiveBuilder.build());
+ container.start();
+ hiveContainerWithS3 = container;
+ }
+ }
+ }
+ }
+
+ public void startHiveContainer() {
+ startHiveContainer(ImmutableMap.of());
+ }
+
/**
* To start and enable Ranger plugin in Hive container, <br>
* you can specify environment variables: <br>
@@ -361,6 +399,33 @@ public class ContainerSuite implements Closeable {
}
}
+ public void startLocalStackContainer() {
+ if (gravitinoLocalStackContainer == null) {
+ synchronized (ContainerSuite.class) {
+ if (gravitinoLocalStackContainer == null) {
+ GravitinoLocalStackContainer.Builder builder =
+ GravitinoLocalStackContainer.builder().withNetwork(network);
+ GravitinoLocalStackContainer container =
closer.register(builder.build());
+ try {
+ container.start();
+ } catch (Exception e) {
+ LOG.error("Failed to start LocalStack container", e);
+ throw new RuntimeException("Failed to start LocalStack container",
e);
+ }
+ gravitinoLocalStackContainer = container;
+ }
+ }
+ }
+ }
+
+ public GravitinoLocalStackContainer getLocalStackContainer() {
+ return gravitinoLocalStackContainer;
+ }
+
+ public HiveContainer getHiveContainerWithS3() {
+ return hiveContainerWithS3;
+ }
+
public KafkaContainer getKafkaContainer() {
return kafkaContainer;
}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
new file mode 100644
index 000000000..11eae3525
--- /dev/null
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.testcontainers.containers.Network;
+
+public class GravitinoLocalStackContainer extends BaseContainer {
+
+ public static final String DEFAULT_IMAGE =
System.getenv("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE");
+ public static final String HOST_NAME = "gravitino-ci-localstack";
+ public static final int PORT = 4566;
+
+ public GravitinoLocalStackContainer(
+ String image,
+ String hostName,
+ Set<Integer> ports,
+ Map<String, String> extraHosts,
+ Map<String, String> filesToMount,
+ Map<String, String> envVars,
+ Optional<Network> network) {
+ super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ protected boolean checkContainerStatus(int retryLimit) {
+ return true;
+ }
+
+ public static class Builder
+ extends BaseContainer.Builder<
+ GravitinoLocalStackContainer.Builder, GravitinoLocalStackContainer> {
+ public Builder() {
+ super();
+ this.image = DEFAULT_IMAGE;
+ this.hostName = HOST_NAME;
+ this.exposePorts = ImmutableSet.of(PORT);
+ }
+
+ @Override
+ public GravitinoLocalStackContainer build() {
+ return new GravitinoLocalStackContainer(
+ image, hostName, exposePorts, extraHosts, filesToMount, envVars,
network);
+ }
+ }
+}