This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 870c569fa [#4981] Improvement(test): Add integration tests for Hive 
catalog with S3 location. (#4982)
870c569fa is described below

commit 870c569fa2209aa68036ecb94bb400ea1bdce7c9
Author: Qi Yu <[email protected]>
AuthorDate: Tue Sep 24 09:54:59 2024 +0800

    [#4981] Improvement(test): Add integration tests for Hive catalog with S3 
location. (#4982)
    
    ### What changes were proposed in this pull request?
    
    Add integration tests to test hive catalog with S3 location.
    
    ### Why are the changes needed?
    
    To make code more robust.
    
    Fix: #4981
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    The changes itself is tested.
---
 build.gradle.kts                                   |   3 +-
 catalogs/catalog-hive/build.gradle.kts             |   2 +
 .../hive/integration/test/CatalogHiveIT.java       | 206 ++++++++++++---------
 .../hive/integration/test/CatalogHiveS3IT.java     | 171 +++++++++++++++++
 gradle/libs.versions.toml                          |   2 +
 .../integration/test/container/ContainerSuite.java |  75 +++++++-
 .../container/GravitinoLocalStackContainer.java    |  69 +++++++
 7 files changed, 431 insertions(+), 97 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index e18b5c56c..c6ea7f13c 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -168,12 +168,13 @@ allprojects {
       param.environment("PROJECT_VERSION", project.version)
 
       // Gravitino CI Docker image
-      param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", 
"apache/gravitino-ci:hive-0.1.13")
+      param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", 
"apache/gravitino-ci:hive-0.1.14")
       param.environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", 
"apache/gravitino-ci:kerberos-hive-0.1.5")
       param.environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE", 
"apache/gravitino-ci:doris-0.1.5")
       param.environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", 
"apache/gravitino-ci:trino-0.1.6")
       param.environment("GRAVITINO_CI_RANGER_DOCKER_IMAGE", 
"apache/gravitino-ci:ranger-0.1.1")
       param.environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", 
"apache/kafka:3.7.0")
+      param.environment("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE", 
"localstack/localstack:latest")
 
       val dockerRunning = project.rootProject.extra["dockerRunning"] as? 
Boolean ?: false
       val macDockerConnector = project.rootProject.extra["macDockerConnector"] 
as? Boolean ?: false
diff --git a/catalogs/catalog-hive/build.gradle.kts 
b/catalogs/catalog-hive/build.gradle.kts
index 82b213a2a..2afb48f9a 100644
--- a/catalogs/catalog-hive/build.gradle.kts
+++ b/catalogs/catalog-hive/build.gradle.kts
@@ -127,6 +127,8 @@ dependencies {
   testImplementation(libs.slf4j.api)
   testImplementation(libs.testcontainers)
   testImplementation(libs.testcontainers.mysql)
+  testImplementation(libs.testcontainers.localstack)
+  testImplementation(libs.hadoop2.s3)
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index c9ef0db1f..081aad480 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -23,12 +23,9 @@ import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.COMM
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.EXTERNAL;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.FORMAT;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.INPUT_FORMAT;
-import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.LOCATION;
-import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.NUM_FILES;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.OUTPUT_FORMAT;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.SERDE_LIB;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TABLE_TYPE;
-import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TOTAL_SIZE;
 import static 
org.apache.gravitino.catalog.hive.HiveTablePropertiesMetadata.TRANSIENT_LAST_DDL_TIME;
 import static org.apache.gravitino.catalog.hive.TableType.EXTERNAL_TABLE;
 import static org.apache.gravitino.catalog.hive.TableType.MANAGED_TABLE;
@@ -45,6 +42,7 @@ import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -110,33 +108,35 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Tag("gravitino-docker-test")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CatalogHiveIT extends AbstractIT {
   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogHiveIT.class);
   public static final String metalakeName =
       GravitinoITUtils.genRandomName("CatalogHiveIT_metalake");
-  public static final String catalogName = 
GravitinoITUtils.genRandomName("CatalogHiveIT_catalog");
-  public static final String SCHEMA_PREFIX = "CatalogHiveIT_schema";
-  public static final String schemaName = 
GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
-  public static final String TABLE_PREFIX = "CatalogHiveIT_table";
-  public static final String tableName = 
GravitinoITUtils.genRandomName(TABLE_PREFIX);
+  public String catalogName = 
GravitinoITUtils.genRandomName("CatalogHiveIT_catalog");
+  public String SCHEMA_PREFIX = "CatalogHiveIT_schema";
+  public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+  public String TABLE_PREFIX = "CatalogHiveIT_table";
+  public String tableName = GravitinoITUtils.genRandomName(TABLE_PREFIX);
   public static final String ALTER_TABLE_NAME = "alert_table_name";
   public static final String TABLE_COMMENT = "table_comment";
   public static final String HIVE_COL_NAME1 = "hive_col_name1";
   public static final String HIVE_COL_NAME2 = "hive_col_name2";
   public static final String HIVE_COL_NAME3 = "hive_col_name3";
-  private static String HIVE_METASTORE_URIS;
-  private static final String provider = "hive";
-  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
-  private static HiveClientPool hiveClientPool;
-  private static GravitinoMetalake metalake;
-  private static Catalog catalog;
-  private static SparkSession sparkSession;
-  private static FileSystem hdfs;
-  private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM %s.%s";
+  protected String HIVE_METASTORE_URIS;
+  protected final String provider = "hive";
+  protected final ContainerSuite containerSuite = ContainerSuite.getInstance();
+  private HiveClientPool hiveClientPool;
+  protected GravitinoMetalake metalake;
+  protected Catalog catalog;
+  protected SparkSession sparkSession;
+  protected FileSystem fileSystem;
+  private final String SELECT_ALL_TEMPLATE = "SELECT * FROM %s.%s";
 
   private static String getInsertWithoutPartitionSql(
       String dbName, String tableName, String values) {
@@ -161,8 +161,7 @@ public class CatalogHiveIT extends AbstractIT {
           STRING_TYPE_NAME,
           "'gravitino_it_test'");
 
-  @BeforeAll
-  public static void startup() throws Exception {
+  protected void startNecessaryContainer() {
     containerSuite.startHiveContainer();
 
     HIVE_METASTORE_URIS =
@@ -170,15 +169,9 @@ public class CatalogHiveIT extends AbstractIT {
             "thrift://%s:%d",
             containerSuite.getHiveContainer().getContainerIpAddress(),
             HiveContainer.HIVE_METASTORE_PORT);
+  }
 
-    HiveConf hiveConf = new HiveConf();
-    hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_METASTORE_URIS);
-
-    // Check if Hive client can connect to Hive metastore
-    hiveClientPool = new HiveClientPool(1, hiveConf);
-    List<String> dbs = hiveClientPool.run(client -> client.getAllDatabases());
-    Assertions.assertFalse(dbs.isEmpty());
-
+  protected void initSparkSession() {
     sparkSession =
         SparkSession.builder()
             .master("local[1]")
@@ -194,7 +187,9 @@ public class CatalogHiveIT extends AbstractIT {
             .config("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
             .enableHiveSupport()
             .getOrCreate();
+  }
 
+  protected void initFileSystem() throws IOException {
     Configuration conf = new Configuration();
     conf.set(
         "fs.defaultFS",
@@ -202,7 +197,23 @@ public class CatalogHiveIT extends AbstractIT {
             "hdfs://%s:%d",
             containerSuite.getHiveContainer().getContainerIpAddress(),
             HiveContainer.HDFS_DEFAULTFS_PORT));
-    hdfs = FileSystem.get(conf);
+    fileSystem = FileSystem.get(conf);
+  }
+
+  @BeforeAll
+  public void startup() throws Exception {
+    startNecessaryContainer();
+
+    HiveConf hiveConf = new HiveConf();
+    hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_METASTORE_URIS);
+
+    // Check if Hive client can connect to Hive metastore
+    hiveClientPool = new HiveClientPool(1, hiveConf);
+    List<String> dbs = hiveClientPool.run(client -> client.getAllDatabases());
+    Assertions.assertFalse(dbs.isEmpty());
+
+    initSparkSession();
+    initFileSystem();
 
     createMetalake();
     createCatalog();
@@ -210,7 +221,7 @@ public class CatalogHiveIT extends AbstractIT {
   }
 
   @AfterAll
-  public static void stop() throws IOException {
+  public void stop() throws IOException {
     if (client != null) {
       Arrays.stream(catalog.asSchemas().listSchemas())
           .filter(schema -> !schema.equals("default"))
@@ -233,8 +244,8 @@ public class CatalogHiveIT extends AbstractIT {
       sparkSession.close();
     }
 
-    if (hdfs != null) {
-      hdfs.close();
+    if (fileSystem != null) {
+      fileSystem.close();
     }
     try {
       closer.close();
@@ -254,7 +265,7 @@ public class CatalogHiveIT extends AbstractIT {
     createSchema();
   }
 
-  private static void createMetalake() {
+  private void createMetalake() {
     GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
     Assertions.assertEquals(0, gravitinoMetalakes.length);
 
@@ -266,7 +277,7 @@ public class CatalogHiveIT extends AbstractIT {
     metalake = loadMetalake;
   }
 
-  private static void createCatalog() {
+  protected void createCatalog() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
 
@@ -275,20 +286,10 @@ public class CatalogHiveIT extends AbstractIT {
     catalog = metalake.loadCatalog(catalogName);
   }
 
-  private static void createSchema() throws TException, InterruptedException {
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("key1", "val1");
-    properties.put("key2", "val2");
-    properties.put(
-        "location",
-        String.format(
-            "hdfs://%s:%d/user/hive/warehouse/%s.db",
-            containerSuite.getHiveContainer().getContainerIpAddress(),
-            HiveContainer.HDFS_DEFAULTFS_PORT,
-            schemaName.toLowerCase()));
+  private void createSchema() throws TException, InterruptedException {
+    Map<String, String> schemaProperties = createSchemaProperties();
     String comment = "comment";
-
-    catalog.asSchemas().createSchema(schemaName, comment, properties);
+    catalog.asSchemas().createSchema(schemaName, comment, schemaProperties);
     Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
     Assertions.assertEquals(schemaName.toLowerCase(), loadSchema.name());
     Assertions.assertEquals(comment, loadSchema.comment());
@@ -335,7 +336,7 @@ public class CatalogHiveIT extends AbstractIT {
     Path tableDirectory = new Path(table.getSd().getLocation());
     FileStatus[] fileStatuses;
     try {
-      fileStatuses = hdfs.listStatus(tableDirectory);
+      fileStatuses = fileSystem.listStatus(tableDirectory);
     } catch (IOException e) {
       LOG.warn("Failed to list status of table directory", e);
       throw new RuntimeException(e);
@@ -346,7 +347,7 @@ public class CatalogHiveIT extends AbstractIT {
     }
   }
 
-  private Map<String, String> createProperties() {
+  protected Map<String, String> createProperties() {
     Map<String, String> properties = Maps.newHashMap();
     properties.put("key1", "val1");
     properties.put("key2", "val2");
@@ -560,7 +561,7 @@ public class CatalogHiveIT extends AbstractIT {
     catalog
         .asTableCatalog()
         .createTable(
-            nameIdentifier, columns, TABLE_COMMENT, ImmutableMap.of(), 
Transforms.EMPTY_TRANSFORM);
+            nameIdentifier, columns, TABLE_COMMENT, createProperties(), 
Transforms.EMPTY_TRANSFORM);
     Table loadedTable1 = catalog.asTableCatalog().loadTable(nameIdentifier);
     HiveTablePropertiesMetadata tablePropertiesMetadata = new 
HiveTablePropertiesMetadata();
     org.apache.hadoop.hive.metastore.api.Table actualTable =
@@ -569,6 +570,10 @@ public class CatalogHiveIT extends AbstractIT {
     checkTableReadWrite(actualTable);
 
     // test set properties
+    Map<String, String> properties = createProperties();
+    properties.put(FORMAT, "textfile");
+    properties.put(SERDE_LIB, HiveStorageConstants.OPENCSV_SERDE_CLASS);
+    properties.put(TABLE_TYPE, "external_table");
     String table2 = GravitinoITUtils.genRandomName(TABLE_PREFIX);
     catalog
         .asTableCatalog()
@@ -576,18 +581,7 @@ public class CatalogHiveIT extends AbstractIT {
             NameIdentifier.of(schemaName, table2),
             columns,
             TABLE_COMMENT,
-            ImmutableMap.of(
-                TABLE_TYPE,
-                "external_table",
-                LOCATION,
-                String.format(
-                    "hdfs://%s:%d/tmp",
-                    containerSuite.getHiveContainer().getContainerIpAddress(),
-                    HiveContainer.HDFS_DEFAULTFS_PORT),
-                FORMAT,
-                "textfile",
-                SERDE_LIB,
-                HiveStorageConstants.OPENCSV_SERDE_CLASS),
+            properties,
             Transforms.EMPTY_TRANSFORM);
     Table loadedTable2 = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, table2));
     org.apache.hadoop.hive.metastore.api.Table actualTable2 =
@@ -607,10 +601,9 @@ public class CatalogHiveIT extends AbstractIT {
     Assertions.assertEquals(
         ((Boolean) 
tablePropertiesMetadata.getDefaultValue(EXTERNAL)).toString().toUpperCase(),
         actualTable.getParameters().get(EXTERNAL));
-    Assertions.assertTrue(actualTable2.getSd().getLocation().endsWith("/tmp"));
     
Assertions.assertNotNull(loadedTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
-    Assertions.assertNotNull(loadedTable2.properties().get(NUM_FILES));
-    Assertions.assertNotNull(loadedTable2.properties().get(TOTAL_SIZE));
+
+    // S3 doesn't support NUM_FILES and TOTAL_SIZE
     checkTableReadWrite(actualTable2);
 
     // test alter properties exception
@@ -630,36 +623,42 @@ public class CatalogHiveIT extends AbstractIT {
   public void testHiveSchemaProperties() throws TException, 
InterruptedException {
     // test LOCATION property
     String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
-    Map<String, String> properties = Maps.newHashMap();
-    String expectedSchemaLocation =
-        String.format(
-            "hdfs://%s:%d/tmp",
-            containerSuite.getHiveContainer().getContainerIpAddress(),
-            HiveContainer.HDFS_DEFAULTFS_PORT);
 
-    properties.put(HiveSchemaPropertiesMetadata.LOCATION, 
expectedSchemaLocation);
-    catalog.asSchemas().createSchema(schemaName, "comment", properties);
+    Map<String, String> schemaProperties = createSchemaProperties();
+    String expectedHDFSSchemaLocation = 
schemaProperties.get(HiveSchemaPropertiesMetadata.LOCATION);
+
+    catalog.asSchemas().createSchema(schemaName, "comment", schemaProperties);
 
     Database actualSchema = hiveClientPool.run(client -> 
client.getDatabase(schemaName));
     String actualSchemaLocation = actualSchema.getLocationUri();
-    
Assertions.assertTrue(actualSchemaLocation.endsWith(expectedSchemaLocation));
+    
Assertions.assertTrue(actualSchemaLocation.endsWith(expectedHDFSSchemaLocation));
 
     NameIdentifier tableIdent =
         NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName(TABLE_PREFIX));
+
+    Map<String, String> tableProperties = createProperties();
+    String expectedSchemaLocation =
+        tableProperties.getOrDefault(
+            HiveSchemaPropertiesMetadata.LOCATION, expectedHDFSSchemaLocation);
+
     catalog
         .asTableCatalog()
         .createTable(
             tableIdent,
             createColumns(),
             TABLE_COMMENT,
-            ImmutableMap.of(),
+            tableProperties,
             Transforms.EMPTY_TRANSFORM);
     org.apache.hadoop.hive.metastore.api.Table actualTable =
         hiveClientPool.run(client -> client.getTable(schemaName, 
tableIdent.name()));
     String actualTableLocation = actualTable.getSd().getLocation();
     // use `tableIdent.name().toLowerCase()` because HMS will convert table 
name to lower
-    String expectedTableLocation = expectedSchemaLocation + "/" + 
tableIdent.name().toLowerCase();
-    Assertions.assertTrue(actualTableLocation.endsWith(expectedTableLocation));
+
+    // actualTable.getSd().getLocation() is null for S3
+    if (!tableProperties.containsKey(HiveTablePropertiesMetadata.LOCATION)) {
+      String expectedTableLocation = expectedSchemaLocation + "/" + 
tableIdent.name().toLowerCase();
+      
Assertions.assertTrue(actualTableLocation.endsWith(expectedTableLocation));
+    }
     checkTableReadWrite(actualTable);
   }
 
@@ -901,7 +900,7 @@ public class CatalogHiveIT extends AbstractIT {
         hiveClientPool.run(client -> client.getTable(schemaName, 
createdTable.name()));
     Path partitionDirectory = new Path(hiveTab.getSd().getLocation() + 
identity.name());
     Assertions.assertFalse(
-        hdfs.exists(partitionDirectory), "The partition directory should not 
exist");
+        fileSystem.exists(partitionDirectory), "The partition directory should 
not exist");
 
     // add partition 
"hive_col_name2=2024-01-02/hive_col_name3=gravitino_it_test2"
     String[] field3 = new String[] {"hive_col_name2"};
@@ -953,7 +952,7 @@ public class CatalogHiveIT extends AbstractIT {
                     client.getPartition(schemaName, createdTable.name(), 
partitionAdded1.name())));
     Path partitionDirectory1 = new Path(hiveTab.getSd().getLocation() + 
identity1.name());
     Assertions.assertFalse(
-        hdfs.exists(partitionDirectory1), "The partition directory should not 
exist");
+        fileSystem.exists(partitionDirectory1), "The partition directory 
should not exist");
     Assertions.assertThrows(
         NoSuchObjectException.class,
         () ->
@@ -962,7 +961,7 @@ public class CatalogHiveIT extends AbstractIT {
                     client.getPartition(schemaName, createdTable.name(), 
partitionAdded2.name())));
     Path partitionDirectory2 = new Path(hiveTab.getSd().getLocation() + 
identity2.name());
     Assertions.assertFalse(
-        hdfs.exists(partitionDirectory2), "The partition directory should not 
exist");
+        fileSystem.exists(partitionDirectory2), "The partition directory 
should not exist");
 
     // test no-exist partition with ifExist=false
     
Assertions.assertFalse(createdTable.supportPartitions().dropPartition(partitionAdded.name()));
@@ -1388,7 +1387,7 @@ public class CatalogHiveIT extends AbstractIT {
 
     // Schema does not have the rename operation.
     final String schemaName = 
GravitinoITUtils.genRandomName("CatalogHiveIT_schema");
-    catalog.asSchemas().createSchema(schemaName, "", ImmutableMap.of());
+    catalog.asSchemas().createSchema(schemaName, "", createSchemaProperties());
 
     final Catalog cata = catalog;
     // Now try to rename table
@@ -1472,19 +1471,23 @@ public class CatalogHiveIT extends AbstractIT {
     catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
     Boolean existed = hiveClientPool.run(client -> 
client.tableExists(schemaName, tableName));
     Assertions.assertFalse(existed, "The Hive table should not exist");
-    Assertions.assertFalse(hdfs.exists(tableDirectory), "The table directory 
should not exist");
+    Assertions.assertFalse(
+        fileSystem.exists(tableDirectory), "The table directory should not 
exist");
   }
 
   @Test
   public void testDropHiveExternalTable() throws TException, 
InterruptedException, IOException {
     Column[] columns = createColumns();
+    Map<String, String> properties = createProperties();
+    properties.put(TABLE_TYPE, EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT));
+
     catalog
         .asTableCatalog()
         .createTable(
             NameIdentifier.of(schemaName, tableName),
             columns,
             TABLE_COMMENT,
-            ImmutableMap.of(TABLE_TYPE, 
EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT)),
+            properties,
             new Transform[] {Transforms.identity(columns[2].name())});
     // Directly get table from Hive metastore to check if the table is created 
successfully.
     org.apache.hadoop.hive.metastore.api.Table hiveTab =
@@ -1497,7 +1500,7 @@ public class CatalogHiveIT extends AbstractIT {
     Assertions.assertFalse(existed, "The table should be not exist");
     Path tableDirectory = new Path(hiveTab.getSd().getLocation());
     Assertions.assertTrue(
-        hdfs.listStatus(tableDirectory).length > 0, "The table should not be 
empty");
+        fileSystem.listStatus(tableDirectory).length > 0, "The table should 
not be empty");
   }
 
   @Test
@@ -1525,21 +1528,25 @@ public class CatalogHiveIT extends AbstractIT {
         catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, 
tableName)),
         "The table should not be found in the catalog");
     Path tableDirectory = new Path(hiveTab.getSd().getLocation());
-    Assertions.assertFalse(hdfs.exists(tableDirectory), "The table directory 
should not exist");
-    Path trashDirectory = hdfs.getTrashRoot(tableDirectory);
-    Assertions.assertFalse(hdfs.exists(trashDirectory), "The trash should not 
exist");
+    Assertions.assertFalse(
+        fileSystem.exists(tableDirectory), "The table directory should not 
exist");
+    Path trashDirectory = fileSystem.getTrashRoot(tableDirectory);
+    Assertions.assertFalse(fileSystem.exists(trashDirectory), "The trash 
should not exist");
   }
 
   @Test
   public void testPurgeHiveExternalTable() throws TException, 
InterruptedException, IOException {
     Column[] columns = createColumns();
+    Map<String, String> properties = createProperties();
+    properties.put(TABLE_TYPE, EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT));
+
     catalog
         .asTableCatalog()
         .createTable(
             NameIdentifier.of(schemaName, tableName),
             columns,
             TABLE_COMMENT,
-            ImmutableMap.of(TABLE_TYPE, 
EXTERNAL_TABLE.name().toLowerCase(Locale.ROOT)),
+            properties,
             new Transform[] {Transforms.identity(columns[2].name())});
 
     // Directly get table from Hive metastore to check if the table is created 
successfully.
@@ -1560,7 +1567,7 @@ public class CatalogHiveIT extends AbstractIT {
     Assertions.assertTrue(existed, "The table should be still exist");
     Path tableDirectory = new Path(hiveTab.getSd().getLocation());
     Assertions.assertTrue(
-        hdfs.listStatus(tableDirectory).length > 0, "The table should not be 
empty");
+        fileSystem.listStatus(tableDirectory).length > 0, "The table should 
not be empty");
   }
 
   @Test
@@ -1649,7 +1656,10 @@ public class CatalogHiveIT extends AbstractIT {
     Exception exception =
         Assertions.assertThrows(
             Exception.class,
-            () -> createdCatalog.asSchemas().createSchema("schema", "comment", 
ImmutableMap.of()));
+            () ->
+                createdCatalog
+                    .asSchemas()
+                    .createSchema("schema", "comment", 
createSchemaProperties()));
     Assertions.assertTrue(exception.getMessage().contains("Failed to connect 
to Hive Metastore"));
 
     Catalog newCatalog =
@@ -1660,14 +1670,14 @@ public class CatalogHiveIT extends AbstractIT {
     // The URI has restored, so it should not throw exception.
     Assertions.assertDoesNotThrow(
         () -> {
-          newCatalog.asSchemas().createSchema("schema", "comment", 
ImmutableMap.of());
+          newCatalog.asSchemas().createSchema("schema", "comment", 
createSchemaProperties());
         });
 
     newCatalog.asSchemas().dropSchema("schema", true);
     metalake.dropCatalog(nameOfCatalog);
   }
 
-  private static void createCatalogWithCustomOperation(String catalogName, 
String customImpl) {
+  private void createCatalogWithCustomOperation(String catalogName, String 
customImpl) {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(METASTORE_URIS, HIVE_METASTORE_URIS);
     properties.put(BaseCatalog.CATALOG_OPERATION_IMPL, customImpl);
@@ -1677,4 +1687,18 @@ public class CatalogHiveIT extends AbstractIT {
             catalogName, Catalog.Type.RELATIONAL, provider, "comment", 
properties);
     catalog.asSchemas().listSchemas();
   }
+
+  protected Map<String, String> createSchemaProperties() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    properties.put(
+        "location",
+        String.format(
+            "hdfs://%s:%d/user/hive/warehouse/%s.db",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HDFS_DEFAULTFS_PORT,
+            schemaName.toLowerCase()));
+    return properties;
+  }
 }
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
new file mode 100644
index 000000000..83d52d241
--- /dev/null
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveS3IT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive.integration.test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class CatalogHiveS3IT extends CatalogHiveIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogHiveS3IT.class);
+
+  private static final String S3_BUCKET_NAME = "my-test-bucket";
+  private GravitinoLocalStackContainer gravitinoLocalStackContainer;
+
+  private static final String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+  private static final String S3_SECRET_KEY = "S3_SECRET_KEY";
+  private static final String S3_ENDPOINT = "S3_ENDPOINT";
+
+  private String getS3Endpoint;
+  private String accessKey;
+  private String secretKey;
+
+  @Override
+  protected void startNecessaryContainer() {
+    containerSuite.startLocalStackContainer();
+    gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();
+
+    Awaitility.await()
+        .atMost(60, TimeUnit.SECONDS)
+        .pollInterval(1, TimeUnit.SECONDS)
+        .until(
+            () -> {
+              try {
+                Container.ExecResult result =
+                    gravitinoLocalStackContainer.executeInContainer(
+                        "awslocal", "iam", "create-user", "--user-name", 
"anonymous");
+                return result.getExitCode() == 0;
+              } catch (Exception e) {
+                LOGGER.info("LocalStack is not ready yet for: ", e);
+                return false;
+              }
+            });
+
+    gravitinoLocalStackContainer.executeInContainer(
+        "awslocal", "s3", "mb", "s3://" + S3_BUCKET_NAME);
+
+    Container.ExecResult result =
+        gravitinoLocalStackContainer.executeInContainer(
+            "awslocal", "iam", "create-access-key", "--user-name", 
"anonymous");
+
+    // Get access key and secret key from result
+    String[] lines = result.getStdout().split("\n");
+    accessKey = lines[3].split(":")[1].trim().substring(1, 21);
+    secretKey = lines[5].split(":")[1].trim().substring(1, 41);
+
+    LOGGER.info("Access key: " + accessKey);
+    LOGGER.info("Secret key: " + secretKey);
+
+    getS3Endpoint =
+        String.format("http://%s:%d";, 
gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
+
+    gravitinoLocalStackContainer.executeInContainer(
+        "awslocal",
+        "s3api",
+        "put-bucket-acl",
+        "--bucket",
+        "my-test-bucket",
+        "--acl",
+        "public-read-write");
+
+    Map<String, String> hiveContainerEnv =
+        ImmutableMap.of(
+            S3_ACCESS_KEY,
+            accessKey,
+            S3_SECRET_KEY,
+            secretKey,
+            S3_ENDPOINT,
+            getS3Endpoint,
+            HiveContainer.HIVE_RUNTIME_VERSION,
+            HiveContainer.HIVE3);
+
+    containerSuite.startHiveContainerWithS3(hiveContainerEnv);
+
+    HIVE_METASTORE_URIS =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+  }
+
+  @Override
+  protected void initFileSystem() throws IOException {
+    // Use S3a file system
+    Configuration conf = new Configuration();
+    conf.set("fs.s3a.access.key", accessKey);
+    conf.set("fs.s3a.secret.key", secretKey);
+    conf.set("fs.s3a.endpoint", getS3Endpoint);
+    conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    conf.set(
+        "fs.s3a.aws.credentials.provider", 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+    conf.set("fs.s3a.path.style.access", "true");
+    conf.set("fs.s3a.connection.ssl.enabled", "false");
+    fileSystem = FileSystem.get(URI.create("s3a://" + S3_BUCKET_NAME), conf);
+  }
+
+  @Override
+  protected void initSparkSession() {
+    sparkSession =
+        SparkSession.builder()
+            .master("local[1]")
+            .appName("Hive Catalog integration test")
+            .config("hive.metastore.uris", HIVE_METASTORE_URIS)
+            .config(
+                "spark.sql.warehouse.dir",
+                String.format(
+                    "hdfs://%s:%d/user/hive/warehouse",
+                    
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
+                    HiveContainer.HDFS_DEFAULTFS_PORT))
+            .config("spark.hadoop.fs.s3a.access.key", accessKey)
+            .config("spark.hadoop.fs.s3a.secret.key", secretKey)
+            .config("spark.hadoop.fs.s3a.endpoint", getS3Endpoint)
+            .config("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+            .config("spark.hadoop.fs.s3a.path.style.access", "true")
+            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+            .config(
+                "spark.hadoop.fs.s3a.aws.credentials.provider",
+                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+            .config("spark.sql.storeAssignmentPolicy", "LEGACY")
+            .config("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+            .enableHiveSupport()
+            .getOrCreate();
+  }
+
+  @Override
+  protected Map<String, String> createSchemaProperties() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "val1");
+    properties.put("key2", "val2");
+    properties.put("location", "s3a://" + S3_BUCKET_NAME + "/test-" + 
System.currentTimeMillis());
+    return properties;
+  }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 6790962cf..255306c98 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -146,6 +146,7 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name = 
"hadoop-hdfs", version.ref
 hadoop2-hdfs-client = { group = "org.apache.hadoop", name = 
"hadoop-hdfs-client", version.ref = "hadoop2" }
 hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common", 
version.ref = "hadoop2"}
 hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name = 
"hadoop-mapreduce-client-core", version.ref = "hadoop2"}
+hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = 
"hadoop2"}
 hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", 
version.ref = "hadoop3" }
 hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", 
version.ref = "hadoop3"}
 hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", 
version.ref = "hadoop3"}
@@ -183,6 +184,7 @@ testcontainers = { group = "org.testcontainers", name = 
"testcontainers", versio
 testcontainers-mysql = { group = "org.testcontainers", name = "mysql", 
version.ref = "testcontainers" }
 testcontainers-postgresql = { group = "org.testcontainers", name = 
"postgresql", version.ref = "testcontainers" }
 testcontainers-junit-jupiter = { group = "org.testcontainers", name = 
"junit-jupiter", version.ref = "testcontainers" }
+testcontainers-localstack = { group = "org.testcontainers", name = 
"localstack", version.ref = "testcontainers" }
 trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino" }
 jwt-api = { group = "io.jsonwebtoken", name = "jjwt-api", version.ref = "jwt"}
 jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref = 
"jwt"}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
index bc311c4bc..12a88bbd9 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/ContainerSuite.java
@@ -71,6 +71,16 @@ public class ContainerSuite implements Closeable {
   private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap 
=
       new EnumMap<>(PGImageName.class);
 
+  private static volatile GravitinoLocalStackContainer 
gravitinoLocalStackContainer;
+
+  /**
+   * We can share the same Hive container as Hive container with S3 contains 
the following
+   * differences: 1. Configuration of S3 and corresponding environment 
variables 2. The Hive
+   * container with S3 is Hive3 container and the Hive container is Hive2 
container. There is
+   * something wrong with the hive2 container to access the S3.
+   */
+  private static volatile HiveContainer hiveContainerWithS3;
+
   protected static final CloseableGroup closer = CloseableGroup.create();
 
   private static void initIfNecessary() {
@@ -112,7 +122,11 @@ public class ContainerSuite implements Closeable {
     return network;
   }
 
-  public void startHiveContainer() {
+  public void startHiveContainer(Map<String, String> env) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.putAll(env);
+    builder.put("HADOOP_USER_NAME", "anonymous");
+
     if (hiveContainer == null) {
       synchronized (ContainerSuite.class) {
         if (hiveContainer == null) {
@@ -121,10 +135,7 @@ public class ContainerSuite implements Closeable {
           HiveContainer.Builder hiveBuilder =
               HiveContainer.builder()
                   .withHostName("gravitino-ci-hive")
-                  .withEnvVars(
-                      ImmutableMap.<String, String>builder()
-                          .put("HADOOP_USER_NAME", "anonymous")
-                          .build())
+                  .withEnvVars(builder.build())
                   .withNetwork(network);
           HiveContainer container = closer.register(hiveBuilder.build());
           container.start();
@@ -134,6 +145,33 @@ public class ContainerSuite implements Closeable {
     }
   }
 
+  public void startHiveContainerWithS3(Map<String, String> env) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.putAll(env);
+    builder.put("HADOOP_USER_NAME", "anonymous");
+
+    if (hiveContainerWithS3 == null) {
+      synchronized (ContainerSuite.class) {
+        if (hiveContainerWithS3 == null) {
+          initIfNecessary();
+          // Start Hive container
+          HiveContainer.Builder hiveBuilder =
+              HiveContainer.builder()
+                  .withHostName("gravitino-ci-hive")
+                  .withEnvVars(builder.build())
+                  .withNetwork(network);
+          HiveContainer container = closer.register(hiveBuilder.build());
+          container.start();
+          hiveContainerWithS3 = container;
+        }
+      }
+    }
+  }
+
+  public void startHiveContainer() {
+    startHiveContainer(ImmutableMap.of());
+  }
+
   /**
    * To start and enable Ranger plugin in Hive container, <br>
    * you can specify environment variables: <br>
@@ -361,6 +399,33 @@ public class ContainerSuite implements Closeable {
     }
   }
 
+  public void startLocalStackContainer() {
+    if (gravitinoLocalStackContainer == null) {
+      synchronized (ContainerSuite.class) {
+        if (gravitinoLocalStackContainer == null) {
+          GravitinoLocalStackContainer.Builder builder =
+              GravitinoLocalStackContainer.builder().withNetwork(network);
+          GravitinoLocalStackContainer container = 
closer.register(builder.build());
+          try {
+            container.start();
+          } catch (Exception e) {
+            LOG.error("Failed to start LocalStack container", e);
+            throw new RuntimeException("Failed to start LocalStack container", 
e);
+          }
+          gravitinoLocalStackContainer = container;
+        }
+      }
+    }
+  }
+
+  public GravitinoLocalStackContainer getLocalStackContainer() {
+    return gravitinoLocalStackContainer;
+  }
+
+  public HiveContainer getHiveContainerWithS3() {
+    return hiveContainerWithS3;
+  }
+
   public KafkaContainer getKafkaContainer() {
     return kafkaContainer;
   }
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
new file mode 100644
index 000000000..11eae3525
--- /dev/null
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/GravitinoLocalStackContainer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.integration.test.container;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.testcontainers.containers.Network;
+
+public class GravitinoLocalStackContainer extends BaseContainer {
+
+  public static final String DEFAULT_IMAGE = 
System.getenv("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE");
+  public static final String HOST_NAME = "gravitino-ci-localstack";
+  public static final int PORT = 4566;
+
+  public GravitinoLocalStackContainer(
+      String image,
+      String hostName,
+      Set<Integer> ports,
+      Map<String, String> extraHosts,
+      Map<String, String> filesToMount,
+      Map<String, String> envVars,
+      Optional<Network> network) {
+    super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  protected boolean checkContainerStatus(int retryLimit) {
+    return true;
+  }
+
+  public static class Builder
+      extends BaseContainer.Builder<
+          GravitinoLocalStackContainer.Builder, GravitinoLocalStackContainer> {
+    public Builder() {
+      super();
+      this.image = DEFAULT_IMAGE;
+      this.hostName = HOST_NAME;
+      this.exposePorts = ImmutableSet.of(PORT);
+    }
+
+    @Override
+    public GravitinoLocalStackContainer build() {
+      return new GravitinoLocalStackContainer(
+          image, hostName, exposePorts, extraHosts, filesToMount, envVars, 
network);
+    }
+  }
+}


Reply via email to