This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f69bdaf15 [#3379] feat(catalog-hadoop): Add S3 support for Fileset
Hadoop catalog (#4232)
f69bdaf15 is described below
commit f69bdaf155ac82fdccbc443fb8d07ffd5d8b81e7
Author: XiaoZ <[email protected]>
AuthorDate: Mon Oct 21 19:15:09 2024 +0800
[#3379] feat(catalog-hadoop): Add S3 support for Fileset Hadoop catalog
(#4232)
### What changes were proposed in this pull request?
Add S3 support for Fileset Hadoop catalog. We only add hadoop-aws
dependency actually, most of the work is conducting tests.
### Why are the changes needed?
Fix: #3379
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
IT.
---------
Co-authored-by: zhanghan18 <[email protected]>
Co-authored-by: yuqi <[email protected]>
---
LICENSE.bin | 1 +
build.gradle.kts | 7 +-
bundles/aws-bundle/build.gradle.kts | 46 +++++
.../gravitino/s3/fs/S3FileSystemProvider.java | 51 ++++++
....gravitino.catalog.hadoop.fs.FileSystemProvider | 20 +++
catalogs/catalog-hadoop/build.gradle.kts | 6 +
.../hadoop/integration/test/HadoopCatalogIT.java | 23 ++-
.../hadoop/integration/test/HadoopS3CatalogIT.java | 189 +++++++++++++++++++++
catalogs/catalog-hive/build.gradle.kts | 2 +-
clients/filesystem-hadoop3/build.gradle.kts | 6 +
.../test/GravitinoVirtualFileSystemIT.java | 5 +-
.../test/GravitinoVirtualFileSystemOSSIT.java | 36 +---
...IT.java => GravitinoVirtualFileSystemS3IT.java} | 140 +++++++++------
gradle/libs.versions.toml | 3 +-
.../gravitino/integration/test/util/HttpUtils.java | 2 +-
settings.gradle.kts | 1 +
16 files changed, 434 insertions(+), 104 deletions(-)
diff --git a/LICENSE.bin b/LICENSE.bin
index ee65d4d69..e922f9367 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -284,6 +284,7 @@
Apache Hadoop
Apache Hadoop Aliyun connector
Apache Hadoop GCS connector
+ Apache Hadoop AWS connector
Apache Hadoop Annotatations
Apache Hadoop Auth
Apache Hadoop Client Aggregator
diff --git a/build.gradle.kts b/build.gradle.kts
index 61ce87857..b954aaf10 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -746,7 +746,9 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") &&
!it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name !=
"trino-connector" &&
- it.name != "integration-test" && it.name != "hive-metastore-common" &&
!it.name.startsWith("flink") && it.name != "gcp-bundle" && it.name !=
"aliyun-bundle"
+ it.name != "integration-test" && it.name != "bundled-catalog" &&
!it.name.startsWith("flink") &&
+ it.name != "integration-test" && it.name != "hive-metastore-common" &&
!it.name.startsWith("flink") &&
+ it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name !=
"aws-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
@@ -765,8 +767,9 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
+ it.name != "bundled-catalog" &&
it.name != "hive-metastore-common" && it.name != "gcp-bundle" &&
- it.name != "aliyun-bundle"
+ it.name != "aliyun-bundle" && it.name != "aws-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
diff --git a/bundles/aws-bundle/build.gradle.kts
b/bundles/aws-bundle/build.gradle.kts
new file mode 100644
index 000000000..741bdc414
--- /dev/null
+++ b/bundles/aws-bundle/build.gradle.kts
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+ `maven-publish`
+ id("java")
+ alias(libs.plugins.shadow)
+}
+
+dependencies {
+ compileOnly(project(":catalogs:catalog-hadoop"))
+ compileOnly(libs.hadoop3.common)
+ implementation(libs.hadoop3.aws)
+}
+
+tasks.withType(ShadowJar::class.java) {
+ isZip64 = true
+ configurations = listOf(project.configurations.runtimeClasspath.get())
+ archiveClassifier.set("")
+}
+
+tasks.jar {
+ dependsOn(tasks.named("shadowJar"))
+ archiveClassifier.set("empty")
+}
+
+tasks.compileJava {
+ dependsOn(":catalogs:catalog-hadoop:runtimeJars")
+}
diff --git
a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
new file mode 100644
index 000000000..4ab1ca242
--- /dev/null
+++
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.s3.fs;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+public class S3FileSystemProvider implements FileSystemProvider {
+ @Override
+ public FileSystem getFileSystem(Path path, Map<String, String> config)
throws IOException {
+ Configuration configuration = new Configuration();
+ config.forEach(
+ (k, v) -> {
+ configuration.set(k.replace("gravitino.bypass.", ""), v);
+ });
+
+ return S3AFileSystem.newInstance(path.toUri(), configuration);
+ }
+
+ @Override
+ public String scheme() {
+ return "s3a";
+ }
+
+ @Override
+ public String name() {
+ return "s3";
+ }
+}
diff --git
a/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
new file mode 100644
index 000000000..37a1a84c7
--- /dev/null
+++
b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.s3.fs.S3FileSystemProvider
\ No newline at end of file
diff --git a/catalogs/catalog-hadoop/build.gradle.kts
b/catalogs/catalog-hadoop/build.gradle.kts
index 4c091b149..62a48656c 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -73,6 +73,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
+ testImplementation(project(":bundles:aws-bundle"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))
@@ -161,6 +162,11 @@ tasks.test {
} else {
dependsOn(tasks.jar)
}
+
+ // this task depends on :bundles:aws-bundle:jar
+ dependsOn(":bundles:aws-bundle:jar")
+ dependsOn(":bundles:aliyun-bundle:jar")
+ dependsOn(":bundles:gcp-bundle:jar")
}
tasks.getByName("generateMetadataFileForMavenJavaPublication") {
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 49bd29b2e..b272bd7a8 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -61,19 +61,24 @@ public class HadoopCatalogIT extends BaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopCatalogIT.class);
protected static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
- public String metalakeName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
- public String catalogName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
- public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
- public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ protected String metalakeName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+ protected String catalogName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+ public final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
+ protected String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected static final String provider = "hadoop";
- protected static GravitinoMetalake metalake;
- protected static Catalog catalog;
- protected static FileSystem fileSystem;
- protected static String defaultBaseLocation;
+ protected GravitinoMetalake metalake;
+ protected Catalog catalog;
+ protected FileSystem fileSystem;
+ protected String defaultBaseLocation;
+
+ protected void startNecessaryContainer() {
+ containerSuite.startHiveContainer();
+ }
@BeforeAll
public void setup() throws IOException {
- containerSuite.startHiveContainer();
+ startNecessaryContainer();
+
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
fileSystem = FileSystem.get(conf);
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
new file mode 100644
index 000000000..bac39b7b8
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.Catalog;
+import
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+@Tag("gravitino-docker-test")
+public class HadoopS3CatalogIT extends HadoopCatalogIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
+ private String bucketName = "s3-bucket-" +
UUID.randomUUID().toString().replace("-", "");
+ private String accessKey;
+ private String secretKey;
+ private String s3Endpoint;
+
+ private GravitinoLocalStackContainer gravitinoLocalStackContainer;
+
+ @VisibleForTesting
+ public void startIntegrationTest() throws Exception {}
+
+ @Override
+ protected void startNecessaryContainer() {
+
+ containerSuite.startLocalStackContainer();
+ gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();
+
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try {
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-user", "--user-name",
"anonymous");
+ return result.getExitCode() == 0;
+ } catch (Exception e) {
+ LOG.info("LocalStack is not ready yet for: ", e);
+ return false;
+ }
+ });
+
+ gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb",
"s3://" + bucketName);
+
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-access-key", "--user-name",
"anonymous");
+
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal",
+ "s3api",
+ "put-bucket-acl",
+ "--bucket",
+ "my-test-bucket",
+ "--acl",
+ "public-read-write");
+
+ // Get access key and secret key from result
+ String[] lines = result.getStdout().split("\n");
+ accessKey = lines[3].split(":")[1].trim().substring(1, 21);
+ secretKey = lines[5].split(":")[1].trim().substring(1, 41);
+
+ LOG.info("Access key: " + accessKey);
+ LOG.info("Secret key: " + secretKey);
+
+ s3Endpoint =
+ String.format("http://%s:%d",
gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
+ }
+
+ @BeforeAll
+ public void setup() throws IOException {
+ copyBundleJarsToHadoop("aws-bundle");
+
+ try {
+ super.startIntegrationTest();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start integration test", e);
+ }
+
+ startNecessaryContainer();
+
+ metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+ catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+ schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");
+
+ schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ Configuration conf = new Configuration();
+
+ conf.set("fs.s3a.access.key", accessKey);
+ conf.set("fs.s3a.secret.key", secretKey);
+ conf.set("fs.s3a.endpoint", s3Endpoint);
+ conf.set(
+ "fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ fileSystem = FileSystem.get(URI.create(String.format("s3a://%s",
bucketName)), conf);
+
+ createMetalake();
+ createCatalog();
+ createSchema();
+ }
+
+ @AfterAll
+ public void stop() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName, true);
+ client.dropMetalake(metalakeName);
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close CloseableGroup", e);
+ }
+ }
+
+ protected String defaultBaseLocation() {
+ if (defaultBaseLocation == null) {
+ try {
+ Path bucket =
+ new Path(
+ String.format(
+ "s3a://%s/%s", bucketName,
GravitinoITUtils.genRandomName("CatalogFilesetIT")));
+ if (!fileSystem.exists(bucket)) {
+ fileSystem.mkdirs(bucket);
+ }
+
+ defaultBaseLocation = bucket.toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create default base location",
e);
+ }
+ }
+
+ return defaultBaseLocation;
+ }
+
+ protected void createCatalog() {
+ Map<String, String> map = Maps.newHashMap();
+ map.put("gravitino.bypass.fs.s3a.access.key", accessKey);
+ map.put("gravitino.bypass.fs.s3a.secret.key", secretKey);
+ map.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint);
+ map.put(
+ "gravitino.bypass.fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ map.put(FILESYSTEM_PROVIDERS, "s3");
+
+ metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider,
"comment", map);
+
+ catalog = metalake.loadCatalog(catalogName);
+ }
+
+ protected String generateLocation(String filesetName) {
+ return String.format("%s/%s", defaultBaseLocation, filesetName);
+ }
+}
diff --git a/catalogs/catalog-hive/build.gradle.kts
b/catalogs/catalog-hive/build.gradle.kts
index aca8959df..f7d6e60c1 100644
--- a/catalogs/catalog-hive/build.gradle.kts
+++ b/catalogs/catalog-hive/build.gradle.kts
@@ -128,7 +128,7 @@ dependencies {
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.testcontainers.localstack)
- testImplementation(libs.hadoop2.s3)
+ testImplementation(libs.hadoop2.aws)
testRuntimeOnly(libs.junit.jupiter.engine)
}
diff --git a/clients/filesystem-hadoop3/build.gradle.kts
b/clients/filesystem-hadoop3/build.gradle.kts
index c3f8c6d7b..7f21c700d 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -41,6 +41,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))
+ testImplementation(project(":bundles:aws-bundle"))
testImplementation(libs.awaitility)
testImplementation(libs.bundles.jetty)
testImplementation(libs.bundles.jersey)
@@ -89,6 +90,11 @@ tasks.test {
} else {
dependsOn(":catalogs:catalog-hadoop:jar",
":catalogs:catalog-hadoop:runtimeJars")
}
+
+ // this task depends on :bundles:aws-bundle:shadowJar
+ dependsOn(":bundles:aws-bundle:jar")
+ dependsOn(":bundles:aliyun-bundle:jar")
+ dependsOn(":bundles:gcp-bundle:jar")
}
tasks.javadoc {
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
index 064643b79..b971ab918 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
@@ -56,13 +56,14 @@ import org.slf4j.LoggerFactory;
@Tag("gravitino-docker-test")
public class GravitinoVirtualFileSystemIT extends BaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystemIT.class);
- private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ protected static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
protected String metalakeName =
GravitinoITUtils.genRandomName("gvfs_it_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("catalog");
protected String schemaName = GravitinoITUtils.genRandomName("schema");
protected GravitinoMetalake metalake;
protected Configuration conf = new Configuration();
protected int defaultBockSize = 128 * 1024 * 1024;
+ protected int defaultReplication = 3;
@BeforeAll
public void startUp() throws Exception {
@@ -459,7 +460,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
Path gvfsPath = genGvfsPath(filesetName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
- assertEquals(3, gvfs.getDefaultReplication(gvfsPath));
+ assertEquals(defaultReplication, gvfs.getDefaultReplication(gvfsPath));
}
catalog.asFilesetCatalog().dropFileset(filesetIdent);
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
index 67c76be3d..6a6557c6c 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
@@ -21,25 +21,18 @@ package
org.apache.gravitino.filesystem.hadoop.integration.test;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +61,9 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
// This value can be by tune by the user, please change it accordingly.
defaultBockSize = 64 * 1024 * 1024;
+ // The default replication factor is 1.
+ defaultReplication = 1;
+
metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
catalogName = GravitinoITUtils.genRandomName("catalog");
schemaName = GravitinoITUtils.genRandomName("schema");
@@ -111,7 +107,7 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
public void tearDown() throws IOException {
Catalog catalog = metalake.loadCatalog(catalogName);
catalog.asSchemas().dropSchema(schemaName, true);
- metalake.dropCatalog(catalogName);
+ metalake.dropCatalog(catalogName, true);
client.dropMetalake(metalakeName);
if (client != null) {
@@ -145,30 +141,6 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
return String.format("oss://%s/%s", BUCKET_NAME, fileset);
}
- @Test
- public void testGetDefaultReplications() throws IOException {
- String filesetName =
GravitinoITUtils.genRandomName("test_get_default_replications");
- NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
- Catalog catalog = metalake.loadCatalog(catalogName);
- String storageLocation = genStorageLocation(filesetName);
- catalog
- .asFilesetCatalog()
- .createFileset(
- filesetIdent,
- "fileset comment",
- Fileset.Type.MANAGED,
- storageLocation,
- new HashMap<>());
-
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
- Path gvfsPath = genGvfsPath(filesetName);
- try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
- // Here HDFS is 3, but for oss is 1.
- assertEquals(1, gvfs.getDefaultReplication(gvfsPath));
- }
-
- catalog.asFilesetCatalog().dropFileset(filesetIdent);
- }
-
@Disabled(
"OSS does not support append, java.io.IOException: The append operation
is not supported")
public void testAppend() throws IOException {}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
similarity index 57%
copy from
clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
copy to
clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
index 67c76be3d..22951da3a 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java
@@ -21,52 +21,103 @@ package
org.apache.gravitino.filesystem.hadoop.integration.test;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.file.Fileset;
+import
org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
-@Disabled(
- "Disabled due to we don't have a real OSS account to test. If you have a
GCP account,"
- + "please change the configuration(BUCKET_NAME, OSS_ACCESS_KEY,
OSS_SECRET_KEY, OSS_ENDPOINT) and enable this test.")
-public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemIT {
- private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystemOSSIT.class);
+public class GravitinoVirtualFileSystemS3IT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystemS3IT.class);
- public static final String BUCKET_NAME = "YOUR_BUCKET";
- public static final String OSS_ACCESS_KEY = "YOUR_OSS_ACCESS_KEY";
- public static final String OSS_SECRET_KEY = "YOUR_OSS_SECRET_KEY";
- public static final String OSS_ENDPOINT = "YOUR_OSS_ENDPOINT";
+ private String bucketName = "s3-bucket-" +
UUID.randomUUID().toString().replace("-", "");
+ private String accessKey;
+ private String secretKey;
+ private String s3Endpoint;
+
+ private GravitinoLocalStackContainer gravitinoLocalStackContainer;
@BeforeAll
public void startIntegrationTest() {
// Do nothing
}
+ private void startS3Mocker() {
+ containerSuite.startLocalStackContainer();
+ gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();
+
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ try {
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-user", "--user-name",
"anonymous");
+ return result.getExitCode() == 0;
+ } catch (Exception e) {
+ LOG.info("LocalStack is not ready yet for: ", e);
+ return false;
+ }
+ });
+
+ gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb",
"s3://" + bucketName);
+
+ Container.ExecResult result =
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal", "iam", "create-access-key", "--user-name",
"anonymous");
+
+ gravitinoLocalStackContainer.executeInContainer(
+ "awslocal",
+ "s3api",
+ "put-bucket-acl",
+ "--bucket",
+ "my-test-bucket",
+ "--acl",
+ "public-read-write");
+
+ // Get access key and secret key from result
+ String[] lines = result.getStdout().split("\n");
+ accessKey = lines[3].split(":")[1].trim().substring(1, 21);
+ secretKey = lines[5].split(":")[1].trim().substring(1, 41);
+
+ LOG.info("Access key: " + accessKey);
+ LOG.info("Secret key: " + secretKey);
+
+ s3Endpoint =
+ String.format("http://%s:%d",
gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
+ }
+
@BeforeAll
public void startUp() throws Exception {
- copyBundleJarsToHadoop("aliyun-bundle");
+ copyBundleJarsToHadoop("aws-bundle");
+
+ // Start s3 simulator
+ startS3Mocker();
+
// Need to download jars to gravitino server
super.startIntegrationTest();
// This value can be by tune by the user, please change it accordingly.
- defaultBockSize = 64 * 1024 * 1024;
+ defaultBockSize = 32 * 1024 * 1024;
+
+ // The value is 1 for S3
+ defaultReplication = 1;
metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
catalogName = GravitinoITUtils.genRandomName("catalog");
@@ -77,12 +128,13 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
Assertions.assertTrue(client.metalakeExists(metalakeName));
Map<String, String> properties = Maps.newHashMap();
- properties.put(FILESYSTEM_PROVIDERS, "oss");
- properties.put("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
- properties.put("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
- properties.put("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
+ properties.put("gravitino.bypass.fs.s3a.access.key", accessKey);
+ properties.put("gravitino.bypass.fs.s3a.secret.key", secretKey);
+ properties.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint);
properties.put(
- "gravitino.bypass.fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ "gravitino.bypass.fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ properties.put(FILESYSTEM_PROVIDERS, "s3");
Catalog catalog =
metalake.createCatalog(
@@ -99,19 +151,19 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
conf.set("fs.gravitino.client.metalake", metalakeName);
// Pass this configuration to the real file system
- conf.set("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
- conf.set("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
- conf.set("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
- conf.set("gravitino.bypass.fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
-
- conf.set(FS_FILESYSTEM_PROVIDERS, "oss");
+ conf.set("fs.s3a.access.key", accessKey);
+ conf.set("fs.s3a.secret.key", secretKey);
+ conf.set("fs.s3a.endpoint", s3Endpoint);
+ conf.set(
+ "fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ conf.set(FS_FILESYSTEM_PROVIDERS, "s3");
}
@AfterAll
public void tearDown() throws IOException {
Catalog catalog = metalake.loadCatalog(catalogName);
catalog.asSchemas().dropSchema(schemaName, true);
- metalake.dropCatalog(catalogName);
+ metalake.dropCatalog(catalogName, true);
client.dropMetalake(metalakeName);
if (client != null) {
@@ -142,34 +194,10 @@ public class GravitinoVirtualFileSystemOSSIT extends
GravitinoVirtualFileSystemI
}
protected String genStorageLocation(String fileset) {
- return String.format("oss://%s/%s", BUCKET_NAME, fileset);
- }
-
- @Test
- public void testGetDefaultReplications() throws IOException {
- String filesetName =
GravitinoITUtils.genRandomName("test_get_default_replications");
- NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
- Catalog catalog = metalake.loadCatalog(catalogName);
- String storageLocation = genStorageLocation(filesetName);
- catalog
- .asFilesetCatalog()
- .createFileset(
- filesetIdent,
- "fileset comment",
- Fileset.Type.MANAGED,
- storageLocation,
- new HashMap<>());
-
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
- Path gvfsPath = genGvfsPath(filesetName);
- try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
- // Here HDFS is 3, but for oss is 1.
- assertEquals(1, gvfs.getDefaultReplication(gvfsPath));
- }
-
- catalog.asFilesetCatalog().dropFileset(filesetIdent);
+ return String.format("s3a://%s/%s", bucketName, fileset);
}
@Disabled(
- "OSS does not support append, java.io.IOException: The append operation
is not supported")
+ "GCS does not support append, java.io.IOException: The append operation
is not supported")
public void testAppend() throws IOException {}
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index fc06975c1..6a50fc2b4 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -150,7 +150,8 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name =
"hadoop-hdfs", version.ref
hadoop2-hdfs-client = { group = "org.apache.hadoop", name =
"hadoop-hdfs-client", version.ref = "hadoop2" }
hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common",
version.ref = "hadoop2"}
hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name =
"hadoop-mapreduce-client-core", version.ref = "hadoop2"}
-hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref =
"hadoop2"}
+hadoop2-aws = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref
= "hadoop2"}
+hadoop3-aws = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref
= "hadoop3"}
hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs",
version.ref = "hadoop3" }
hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common",
version.ref = "hadoop3"}
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client",
version.ref = "hadoop3"}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java
index 6ccac7dd7..0fe4d728c 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java
@@ -29,7 +29,7 @@ public class HttpUtils {
private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class);
/**
- * Check if the http server is up, If http response status code is 200, then
we're assuming the
+ * Check if the http server is up. If http response status code is 200, then
we're assuming the
* server is up. Or else we assume the server is not ready.
*
* <p>Note: The method will ignore the response body and only check the
status code.
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 10cf10749..6d08431f0 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -70,6 +70,7 @@ project(":spark-connector:spark-runtime-3.5").projectDir =
file("spark-connector
include("web:web", "web:integration-test")
include("docs")
include("integration-test-common")
+include(":bundles:aws-bundle")
include(":bundles:gcp-bundle")
include("bundles:aliyun-bundle")
findProject(":bundles:aliyun-bundle")?.name = "aliyun-bundle"