This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 93cdbc259 [#5074] feat(hadoop-catalog): Support GCS fileset. (#5079)
93cdbc259 is described below
commit 93cdbc259b1b4bcd4bda2d1b34d011f19368d2dd
Author: Qi Yu <[email protected]>
AuthorDate: Thu Oct 17 20:06:35 2024 +0800
[#5074] feat(hadoop-catalog): Support GCS fileset. (#5079)
### What changes were proposed in this pull request?
1. Add a bundled jar for Hadoop GCS jar.
2. Support GCS in Hadoop catalog.
### Why are the changes needed?
Users highly demand Fileset for GCS storage.
Fix: #5074
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
Manually, please see: HadoopGCPCatalogIT
---
build.gradle.kts | 4 +-
bundles/build.gradle.kts | 22 +++
bundles/gcp-bundle/build.gradle.kts | 46 ++++++
.../gravitino/gcs/fs/GCSFileSystemProvider.java | 19 +--
....gravitino.catalog.hadoop.fs.FileSystemProvider | 20 +++
catalogs/catalog-hadoop/build.gradle.kts | 2 +
.../catalog/hadoop/fs/HDFSFileSystemProvider.java | 3 +-
.../catalog/hadoop/fs/LocalFileSystemProvider.java | 3 +-
.../hadoop/integration/test/HadoopCatalogIT.java | 57 ++++---
.../integration/test/HadoopGCSCatalogIT.java | 97 ++++++++++++
clients/filesystem-hadoop3/build.gradle.kts | 1 +
.../GravitinoVirtualFileSystemConfiguration.java | 23 ++-
.../test/GravitinoVirtualFileSystemGCSIT.java | 170 +++++++++++++++++++++
.../test/GravitinoVirtualFileSystemIT.java | 86 +++++++----
gradle/libs.versions.toml | 2 +
.../gravitino/integration/test/util/ITUtils.java | 1 +
settings.gradle.kts | 1 +
17 files changed, 479 insertions(+), 78 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index 6db5f00cc..9733a1791 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -745,7 +745,7 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") &&
!it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name !=
"trino-connector" &&
- it.name != "integration-test" && it.name != "hive-metastore-common" &&
!it.name.startsWith("flink")
+ it.name != "integration-test" && it.name != "hive-metastore-common" &&
!it.name.startsWith("flink") && it.name != "gcp-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
@@ -764,7 +764,7 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
- it.name != "hive-metastore-common"
+ it.name != "hive-metastore-common" && it.name != "gcp-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
diff --git a/bundles/build.gradle.kts b/bundles/build.gradle.kts
new file mode 100644
index 000000000..043fbfec6
--- /dev/null
+++ b/bundles/build.gradle.kts
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+tasks.all {
+ enabled = false
+}
\ No newline at end of file
diff --git a/bundles/gcp-bundle/build.gradle.kts
b/bundles/gcp-bundle/build.gradle.kts
new file mode 100644
index 000000000..9433a6004
--- /dev/null
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+ `maven-publish`
+ id("java")
+ alias(libs.plugins.shadow)
+}
+
+dependencies {
+ compileOnly(project(":catalogs:catalog-hadoop"))
+ compileOnly(libs.hadoop3.common)
+ implementation(libs.hadoop3.gcs)
+}
+
+tasks.withType(ShadowJar::class.java) {
+ isZip64 = true
+ configurations = listOf(project.configurations.runtimeClasspath.get())
+ archiveClassifier.set("")
+}
+
+tasks.jar {
+ dependsOn(tasks.named("shadowJar"))
+ archiveClassifier.set("empty")
+}
+
+tasks.compileJava {
+ dependsOn(":catalogs:catalog-hadoop:runtimeJars")
+}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
similarity index 70%
copy from
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
copy to
bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 70e44c76f..919baa03b 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
+++
b/bundles/gcp-bundle/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -16,38 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.catalog.hadoop.fs;
-
-import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
-import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+package org.apache.gravitino.gcs.fs;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import java.io.IOException;
import java.util.Map;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-public class LocalFileSystemProvider implements FileSystemProvider {
-
+public class GCSFileSystemProvider implements FileSystemProvider {
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config)
throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
- configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+ configuration.set(k.replace("gravitino.bypass.", ""), v);
});
- return LocalFileSystem.newInstance(path.toUri(), configuration);
+ return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}
@Override
public String scheme() {
- return "file";
+ return "gs";
}
@Override
public String name() {
- return BUILTIN_LOCAL_FS_PROVIDER;
+ return "gcs";
}
}
diff --git
a/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
new file mode 100644
index 000000000..8a65be70f
--- /dev/null
+++
b/bundles/gcp-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.gcs.fs.GCSFileSystemProvider
\ No newline at end of file
diff --git a/catalogs/catalog-hadoop/build.gradle.kts
b/catalogs/catalog-hadoop/build.gradle.kts
index 940289347..9ff3cc0e3 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -73,6 +73,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
+ testImplementation(project(":bundles:gcp-bundle"))
testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)
@@ -86,6 +87,7 @@ dependencies {
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
+ testImplementation(libs.hadoop3.gcs)
testRuntimeOnly(libs.junit.jupiter.engine)
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
index 7c9ceebdd..c7c2fd393 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -27,7 +27,6 @@ import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
public class HDFSFileSystemProvider implements FileSystemProvider {
@@ -39,7 +38,7 @@ public class HDFSFileSystemProvider implements
FileSystemProvider {
(k, v) -> {
configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
});
- return DistributedFileSystem.newInstance(path.toUri(), configuration);
+ return FileSystem.newInstance(path.toUri(), configuration);
}
@Override
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
index 70e44c76f..e940e2bb6 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
public class LocalFileSystemProvider implements FileSystemProvider {
@@ -38,7 +37,7 @@ public class LocalFileSystemProvider implements
FileSystemProvider {
configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
});
- return LocalFileSystem.newInstance(path.toUri(), configuration);
+ return FileSystem.newInstance(path.toUri(), configuration);
}
@Override
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 644b98cb9..76d17ff01 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -59,26 +59,24 @@ import org.slf4j.LoggerFactory;
@Tag("gravitino-docker-test")
public class HadoopCatalogIT extends BaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopCatalogIT.class);
- private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
+ protected static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
- public static final String metalakeName =
- GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
- public static final String catalogName =
- GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+ public String metalakeName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+ public String catalogName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
- public static final String schemaName =
GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
- private static final String provider = "hadoop";
- private static GravitinoMetalake metalake;
- private static Catalog catalog;
- private static FileSystem hdfs;
- private static String defaultBaseLocation;
+ public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ protected static final String provider = "hadoop";
+ protected static GravitinoMetalake metalake;
+ protected static Catalog catalog;
+ protected static FileSystem fileSystem;
+ protected static String defaultBaseLocation;
@BeforeAll
public void setup() throws IOException {
containerSuite.startHiveContainer();
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
- hdfs = FileSystem.get(conf);
+ fileSystem = FileSystem.get(conf);
createMetalake();
createCatalog();
@@ -91,8 +89,8 @@ public class HadoopCatalogIT extends BaseIT {
catalog.asSchemas().dropSchema(schemaName, true);
metalake.dropCatalog(catalogName);
client.dropMetalake(metalakeName);
- if (hdfs != null) {
- hdfs.close();
+ if (fileSystem != null) {
+ fileSystem.close();
}
try {
@@ -102,7 +100,7 @@ public class HadoopCatalogIT extends BaseIT {
}
}
- private void createMetalake() {
+ protected void createMetalake() {
GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
Assertions.assertEquals(0, gravitinoMetalakes.length);
@@ -114,14 +112,14 @@ public class HadoopCatalogIT extends BaseIT {
metalake = loadMetalake;
}
- private void createCatalog() {
+ protected void createCatalog() {
metalake.createCatalog(
catalogName, Catalog.Type.FILESET, provider, "comment",
ImmutableMap.of());
catalog = metalake.loadCatalog(catalogName);
}
- private void createSchema() {
+ protected void createSchema() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
@@ -137,7 +135,7 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertNotNull(loadSchema.properties().get("location"));
}
- private static void dropSchema() {
+ private void dropSchema() {
catalog.asSchemas().dropSchema(schemaName, true);
Assertions.assertFalse(catalog.asSchemas().schemaExists(schemaName));
}
@@ -171,7 +169,7 @@ public class HadoopCatalogIT extends BaseIT {
String filesetName = "test_create_fileset";
String storageLocation = storageLocation(filesetName);
Assertions.assertFalse(
- hdfs.exists(new Path(storageLocation)), "storage location should not
exists");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
not exists");
Fileset fileset =
createFileset(
filesetName,
@@ -242,7 +240,7 @@ public class HadoopCatalogIT extends BaseIT {
String filesetName = "test_create_fileset_with_chinese";
String storageLocation = storageLocation(filesetName) + "/中文目录test";
Assertions.assertFalse(
- hdfs.exists(new Path(storageLocation)), "storage location should not
exists");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
not exists");
Fileset fileset =
createFileset(
filesetName,
@@ -285,7 +283,7 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(1, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertTrue(
- hdfs.exists(new Path(storageLocation)), "storage location should be
created");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
be created");
// create fileset with storage location that not exist
String filesetName2 = "test_external_fileset_no_exist";
@@ -349,7 +347,7 @@ public class HadoopCatalogIT extends BaseIT {
String storageLocation = storageLocation(filesetName);
Assertions.assertFalse(
- hdfs.exists(new Path(storageLocation)), "storage location should not
exists");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
not exists");
createFileset(
filesetName, "comment", Fileset.Type.MANAGED, storageLocation,
ImmutableMap.of("k1", "v1"));
@@ -365,7 +363,7 @@ public class HadoopCatalogIT extends BaseIT {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName,
filesetName)),
"fileset should not be exists");
Assertions.assertFalse(
- hdfs.exists(new Path(storageLocation)), "storage location should be
dropped");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
be dropped");
}
@Test
@@ -392,7 +390,7 @@ public class HadoopCatalogIT extends BaseIT {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName,
filesetName)),
"fileset should not be exists");
Assertions.assertTrue(
- hdfs.exists(new Path(storageLocation)), "storage location should not
be dropped");
+ fileSystem.exists(new Path(storageLocation)), "storage location should
not be dropped");
}
@Test
@@ -688,7 +686,7 @@ public class HadoopCatalogIT extends BaseIT {
}
}
- private static String generateLocation(String filesetName) {
+ protected String generateLocation(String filesetName) {
return String.format(
"hdfs://%s:%d/user/hadoop/%s/%s/%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
@@ -707,7 +705,7 @@ public class HadoopCatalogIT extends BaseIT {
if (storageLocation != null) {
Path location = new Path(storageLocation);
try {
- hdfs.deleteOnExit(location);
+ fileSystem.deleteOnExit(location);
} catch (IOException e) {
LOG.warn("Failed to delete location: {}", location, e);
}
@@ -724,10 +722,11 @@ public class HadoopCatalogIT extends BaseIT {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName,
filesetName)),
"fileset should be exists");
Assertions.assertTrue(
- hdfs.exists(new Path(storageLocation(filesetName))), "storage location
should be exists");
+ fileSystem.exists(new Path(storageLocation(filesetName))),
+ "storage location should be exists");
}
- private static String defaultBaseLocation() {
+ protected String defaultBaseLocation() {
if (defaultBaseLocation == null) {
defaultBaseLocation =
String.format(
@@ -739,7 +738,7 @@ public class HadoopCatalogIT extends BaseIT {
return defaultBaseLocation;
}
- private static String storageLocation(String filesetName) {
+ private String storageLocation(String filesetName) {
return defaultBaseLocation() + "/" + filesetName;
}
}
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
new file mode 100644
index 000000000..74ae2a77c
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopGCSCatalogIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+
+@Tag("gravitino-docker-test")
+@Disabled(
+ "Disabled due to we don't have a real GCP account to test. If you have a
GCP account,"
+ + "please change the configuration(YOUR_KEY_FILE, YOUR_BUCKET) and
enable this test.")
+public class HadoopGCSCatalogIT extends HadoopCatalogIT {
+
+ public static final String BUCKET_NAME = "YOUR_BUCKET";
+ public static final String SERVICE_ACCOUNT_FILE = "YOUR_KEY_FILE";
+
+ @BeforeAll
+ public void setup() throws IOException {
+ metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
+ catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
+ schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");
+
+ schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+ Configuration conf = new Configuration();
+
+ conf.set("fs.gs.auth.service.account.enable", "true");
+ conf.set("fs.gs.auth.service.account.json.keyfile", SERVICE_ACCOUNT_FILE);
+ fileSystem = FileSystem.get(URI.create(String.format("gs://%s",
BUCKET_NAME)), conf);
+
+ createMetalake();
+ createCatalog();
+ createSchema();
+ }
+
+ protected String defaultBaseLocation() {
+ if (defaultBaseLocation == null) {
+ try {
+ Path bucket =
+ new Path(
+ String.format(
+ "gs://%s/%s", BUCKET_NAME,
GravitinoITUtils.genRandomName("CatalogFilesetIT")));
+ if (!fileSystem.exists(bucket)) {
+ fileSystem.mkdirs(bucket);
+ }
+
+ defaultBaseLocation = bucket.toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create default base location",
e);
+ }
+ }
+
+ return defaultBaseLocation;
+ }
+
+ protected void createCatalog() {
+ Map<String, String> map = Maps.newHashMap();
+ map.put("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
+ map.put("gravitino.bypass.fs.gs.auth.service.account.json.keyfile",
SERVICE_ACCOUNT_FILE);
+ map.put(FILESYSTEM_PROVIDERS, "gcs");
+
+ metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider,
"comment", map);
+
+ catalog = metalake.loadCatalog(catalogName);
+ }
+
+ protected String generateLocation(String filesetName) {
+ return String.format("%s/%s", defaultBaseLocation, filesetName);
+ }
+}
diff --git a/clients/filesystem-hadoop3/build.gradle.kts
b/clients/filesystem-hadoop3/build.gradle.kts
index aefac5f28..cae188818 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
testImplementation(project(":server-common"))
testImplementation(project(":clients:client-java"))
testImplementation(project(":integration-test-common", "testArtifacts"))
+ testImplementation(project(":bundles:gcp-bundle"))
testImplementation(libs.awaitility)
testImplementation(libs.bundles.jetty)
testImplementation(libs.bundles.jersey)
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index cd1ecb92f..95ce4df2a 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -21,9 +21,18 @@ package org.apache.gravitino.filesystem.hadoop;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
/** Configuration class for Gravitino Virtual File System. */
-class GravitinoVirtualFileSystemConfiguration {
+public class GravitinoVirtualFileSystemConfiguration {
+
+ /**
+ * The prefix of the Gravitino fileset URI. The URI of the Gravitino fileset
should start with
+ * this prefix.
+ */
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
+
+ /** The scheme of the Gravitino Virtual File System. */
public static final String GVFS_SCHEME = "gvfs";
+
+ /** The prefix of the Gravitino Virtual File System. */
public static final String GVFS_CONFIG_PREFIX = "fs.gvfs.";
/** The configuration key for the Gravitino server URI. */
@@ -42,8 +51,12 @@ class GravitinoVirtualFileSystemConfiguration {
*/
public static final String FS_FILESYSTEM_PROVIDERS =
"fs.gvfs.filesystem.providers";
+ /** The authentication type for simple authentication. */
public static final String SIMPLE_AUTH_TYPE = "simple";
+ /** The authentication type for oauth2 authentication. */
public static final String OAUTH2_AUTH_TYPE = "oauth2";
+
+ /** The authentication type for kerberos authentication. */
public static final String KERBEROS_AUTH_TYPE = "kerberos";
// oauth2
/** The configuration key for the URI of the default OAuth server. */
@@ -74,6 +87,10 @@ class GravitinoVirtualFileSystemConfiguration {
public static final String FS_GRAVITINO_FILESET_CACHE_MAX_CAPACITY_KEY =
"fs.gravitino.fileset.cache.maxCapacity";
+ /**
+ * The default value for the maximum capacity of the Gravitino fileset
cache. The default value is
+ * 20.
+ */
public static final int FS_GRAVITINO_FILESET_CACHE_MAX_CAPACITY_DEFAULT = 20;
/**
@@ -83,6 +100,10 @@ class GravitinoVirtualFileSystemConfiguration {
public static final String
FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_KEY =
"fs.gravitino.fileset.cache.evictionMillsAfterAccess";
+ /**
+ * The default value for the eviction time of the Gravitino fileset cache,
measured in mills after
+ * access.
+ */
public static final long
FS_GRAVITINO_FILESET_CACHE_EVICTION_MILLS_AFTER_ACCESS_DEFAULT =
1000L * 60 * 60;
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
new file mode 100644
index 000000000..a42d1c4b7
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.integration.test.util.DownloaderUtils;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Disabled(
+ "Disabled due to we don't have a real GCP account to test. If you have a
GCP account,"
+ + "please change the configuration(YOUR_KEY_FILE, YOUR_BUCKET) and
enable this test.")
+public class GravitinoVirtualFileSystemGCSIT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystemGCSIT.class);
+
+ public static final String BUCKET_NAME = "YOUR_BUCKET";
+ public static final String SERVICE_ACCOUNT_FILE = "YOUR_KEY_FILE";
+
+ @BeforeAll
+ public void startIntegrationTest() {
+ // Do nothing
+ }
+
+ @BeforeAll
+ public void startUp() throws Exception {
+ copyGCPJars();
+ // Need to download jars to gravitino server
+ super.startIntegrationTest();
+
+ // This value can be by tune by the user, please change it accordingly.
+ defaultBockSize = 64 * 1024 * 1024;
+
+ metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ catalogName = GravitinoITUtils.genRandomName("catalog");
+ schemaName = GravitinoITUtils.genRandomName("schema");
+
+ Assertions.assertFalse(client.metalakeExists(metalakeName));
+ metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
+ Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(FILESYSTEM_PROVIDERS, "gcs");
+ properties.put(
+ "gravitino.bypass.fs.gs.auth.service.account.json.keyfile",
SERVICE_ACCOUNT_FILE);
+
+ Catalog catalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment",
properties);
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+ Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+ conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+ conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+ conf.set("fs.gvfs.impl.disable.cache", "true");
+ conf.set("fs.gravitino.server.uri", serverUri);
+ conf.set("fs.gravitino.client.metalake", metalakeName);
+
+ // Pass this configuration to the real file system
+ conf.set("gravitino.bypass.fs.gs.auth.service.account.enable", "true");
+ conf.set("gravitino.bypass.fs.gs.auth.service.account.json.keyfile",
SERVICE_ACCOUNT_FILE);
+ conf.set(FS_FILESYSTEM_PROVIDERS, "gcs");
+ }
+
+ @AfterAll
+ public void tearDown() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName);
+ client.dropMetalake(metalakeName);
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Exception in closing CloseableGroup", e);
+ }
+ }
+
+ /**
+ * Remove the `gravitino.bypass` prefix from the configuration and pass it
to the real file system
+ * This method corresponds to the method
org.apache.gravitino.filesystem.hadoop
+ * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original
code.
+ */
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ Configuration gcsConf = new Configuration();
+ gvfsConf.forEach(
+ entry -> {
+ gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""),
entry.getValue());
+ });
+
+ return gcsConf;
+ }
+
+ protected String genStorageLocation(String fileset) {
+ return String.format("gs://%s/%s", BUCKET_NAME, fileset);
+ }
+
+ private static boolean isDeploy() {
+ String mode =
+ System.getProperty(ITUtils.TEST_MODE) == null
+ ? ITUtils.EMBEDDED_TEST_MODE
+ : System.getProperty(ITUtils.TEST_MODE);
+
+ return Objects.equals(mode, ITUtils.DEPLOY_TEST_MODE);
+ }
+
+ private void copyGCPJars() {
+ if (!isDeploy()) {
+ return;
+ }
+
+ String gravitinoHome = System.getenv("GRAVITINO_HOME");
+ String jarName = String.format("gravitino-gcp-bundle-%s.jar",
System.getenv("PROJECT_VERSION"));
+ String gcsJars =
+ ITUtils.joinPath(
+ gravitinoHome, "..", "..", "bundles", "gcp-bundle", "build",
"libs", jarName);
+ gcsJars = "file://" + gcsJars;
+ try {
+ if (!ITUtils.EMBEDDED_TEST_MODE.equals(testMode)) {
+ String hadoopLibDirs = ITUtils.joinPath(gravitinoHome, "catalogs",
"hadoop", "libs");
+ DownloaderUtils.downloadFile(gcsJars, hadoopLibDirs);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to copy the gcs dependency jars: %s",
gcsJars), e);
+ }
+ }
+
+ @Disabled(
+ "GCS does not support append, java.io.IOException: The append operation
is not supported")
+ public void testAppend() throws IOException {}
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
index 9b6334e09..ced9a0b8b 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java
@@ -57,14 +57,15 @@ import org.slf4j.LoggerFactory;
public class GravitinoVirtualFileSystemIT extends BaseIT {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoVirtualFileSystemIT.class);
private static final ContainerSuite containerSuite =
ContainerSuite.getInstance();
- private static final String metalakeName =
GravitinoITUtils.genRandomName("gvfs_it_metalake");
- private static final String catalogName =
GravitinoITUtils.genRandomName("catalog");
- private static final String schemaName =
GravitinoITUtils.genRandomName("schema");
- private static GravitinoMetalake metalake;
- private static Configuration conf = new Configuration();
+ protected String metalakeName =
GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ protected String catalogName = GravitinoITUtils.genRandomName("catalog");
+ protected String schemaName = GravitinoITUtils.genRandomName("schema");
+ protected GravitinoMetalake metalake;
+ protected Configuration conf = new Configuration();
+ protected int defaultBockSize = 128 * 1024 * 1024;
@BeforeAll
- public void startUp() {
+ public void startUp() throws Exception {
containerSuite.startHiveContainer();
Assertions.assertFalse(client.metalakeExists(metalakeName));
metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
@@ -112,10 +113,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
}
}
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ return gvfsConf;
+ }
+
@Test
public void testCreate() throws IOException {
// create fileset
- String filesetName = "test_fileset_create";
+ String filesetName = GravitinoITUtils.genRandomName("test_fileset_create");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -131,25 +136,28 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs create
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
Assertions.assertTrue(gvfs.exists(gvfsPath));
String fileName = "test.txt";
Path createPath = new Path(gvfsPath + "/" + fileName);
- gvfs.create(createPath);
+ // GCS need to close the stream to create the file manually.
+ gvfs.create(createPath).close();
Assertions.assertTrue(gvfs.exists(createPath));
Assertions.assertTrue(gvfs.getFileStatus(createPath).isFile());
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
fileName)));
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testAppend() throws IOException {
// create fileset
- String filesetName = "test_fileset_append";
+ String filesetName = GravitinoITUtils.genRandomName("test_fileset_append");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -165,7 +173,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs append
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
String fileName = "test.txt";
@@ -173,7 +181,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
Assertions.assertTrue(gvfs.exists(gvfsPath));
- gvfs.create(appendPath);
+ gvfs.create(appendPath).close();
Assertions.assertTrue(gvfs.exists(appendPath));
Assertions.assertTrue(gvfs.getFileStatus(appendPath).isFile());
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
fileName)));
@@ -203,12 +211,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
}
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testDelete() throws IOException {
// create fileset
- String filesetName = "test_fileset_delete";
+ String filesetName = GravitinoITUtils.genRandomName("test_fileset_delete");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -224,14 +234,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs delete
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
String fileName = "test.txt";
Path deletePath = new Path(gvfsPath + "/" + fileName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
Assertions.assertTrue(gvfs.exists(gvfsPath));
- gvfs.create(deletePath);
+ gvfs.create(deletePath).close();
Assertions.assertTrue(gvfs.exists(deletePath));
Assertions.assertTrue(gvfs.getFileStatus(deletePath).isFile());
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
fileName)));
@@ -242,12 +252,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Assertions.assertFalse(fs.exists(new Path(storageLocation + "/" +
fileName)));
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testGetStatus() throws IOException {
// create fileset
- String filesetName = "test_fileset_get_status";
+ String filesetName =
GravitinoITUtils.genRandomName("test_fileset_get_status");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -263,14 +275,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs get status
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
String fileName = "test.txt";
Path statusPath = new Path(gvfsPath + "/" + fileName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
Assertions.assertTrue(gvfs.exists(gvfsPath));
- gvfs.create(statusPath);
+ gvfs.create(statusPath).close();
Assertions.assertTrue(gvfs.exists(statusPath));
Assertions.assertTrue(gvfs.getFileStatus(statusPath).isFile());
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
fileName)));
@@ -284,12 +296,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
.replaceFirst(genGvfsPath(filesetName).toString(),
storageLocation));
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testListStatus() throws IOException {
// create fileset
- String filesetName = "test_fileset_list_status";
+ String filesetName =
GravitinoITUtils.genRandomName("test_fileset_list_status");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -305,7 +319,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs list status
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
for (int i = 0; i < 10; i++) {
@@ -313,7 +327,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Path statusPath = new Path(gvfsPath + "/" + fileName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
Assertions.assertTrue(gvfs.exists(gvfsPath));
- gvfs.create(statusPath);
+ gvfs.create(statusPath).close();
Assertions.assertTrue(gvfs.exists(statusPath));
Assertions.assertTrue(gvfs.getFileStatus(statusPath).isFile());
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
fileName)));
@@ -340,12 +354,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
}
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testMkdirs() throws IOException {
// create fileset
- String filesetName = "test_fileset_mkdirs";
+ String filesetName = GravitinoITUtils.genRandomName("test_fileset_mkdirs");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -361,7 +377,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs mkdirs
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
@@ -374,12 +390,14 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Assertions.assertTrue(fs.exists(new Path(storageLocation + "/" +
dirName)));
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testRename() throws IOException {
// create fileset
- String filesetName = "test_fileset_rename";
+ String filesetName = GravitinoITUtils.genRandomName("test_fileset_rename");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -395,7 +413,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
// test gvfs rename
Path hdfsPath = new Path(storageLocation);
- try (FileSystem fs = hdfsPath.getFileSystem(conf)) {
+ try (FileSystem fs =
hdfsPath.getFileSystem(convertGvfsConfigToRealFileSystemConfig(conf))) {
Assertions.assertTrue(fs.exists(hdfsPath));
Path gvfsPath = genGvfsPath(filesetName);
String srcName = "test_src";
@@ -420,11 +438,13 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Assertions.assertFalse(fs.exists(new Path(storageLocation + "/" +
srcName)));
}
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testGetDefaultReplications() throws IOException {
- String filesetName = "test_get_default_replications";
+ String filesetName =
GravitinoITUtils.genRandomName("test_get_default_replications");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -441,11 +461,13 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
assertEquals(3, gvfs.getDefaultReplication(gvfsPath));
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
@Test
public void testGetDefaultBlockSizes() throws IOException {
- String filesetName = "test_get_default_block_sizes";
+ String filesetName =
GravitinoITUtils.genRandomName("test_get_default_block_sizes");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Catalog catalog = metalake.loadCatalog(catalogName);
String storageLocation = genStorageLocation(filesetName);
@@ -460,15 +482,17 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
Path gvfsPath = genGvfsPath(filesetName);
try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) {
- assertEquals(128 * 1024 * 1024, gvfs.getDefaultBlockSize(gvfsPath));
+ assertEquals(defaultBockSize, gvfs.getDefaultBlockSize(gvfsPath));
}
+
+ catalog.asFilesetCatalog().dropFileset(filesetIdent);
}
- private String genStorageLocation(String fileset) {
+ protected String genStorageLocation(String fileset) {
return String.format("%s/%s", baseHdfsPath(), fileset);
}
- private static String baseHdfsPath() {
+ private String baseHdfsPath() {
return String.format(
"hdfs://%s:%d/%s/%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
@@ -477,7 +501,7 @@ public class GravitinoVirtualFileSystemIT extends BaseIT {
schemaName);
}
- private Path genGvfsPath(String fileset) {
+ protected Path genGvfsPath(String fileset) {
return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName,
schemaName, fileset));
}
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 09cfbea2c..af04c6314 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -32,6 +32,7 @@ airlift-resolver = "1.6"
hive2 = "2.3.9"
hadoop2 = "2.10.2"
hadoop3 = "3.1.0"
+hadoop3-gcs = "1.9.4-hadoop3"
hadoop-minikdc = "3.3.6"
htrace-core4 = "4.1.0-incubating"
httpclient5 = "5.2.1"
@@ -152,6 +153,7 @@ hadoop3-hdfs = { group = "org.apache.hadoop", name =
"hadoop-hdfs", version.ref
hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common",
version.ref = "hadoop3"}
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client",
version.ref = "hadoop3"}
hadoop3-minicluster = { group = "org.apache.hadoop", name =
"hadoop-minicluster", version.ref = "hadoop-minikdc"}
+hadoop3-gcs = { group = "com.google.cloud.bigdataoss", name = "gcs-connector",
version.ref = "hadoop3-gcs"}
htrace-core4 = { group = "org.apache.htrace", name = "htrace-core4",
version.ref = "htrace-core4" }
airlift-json = { group = "io.airlift", name = "json", version.ref =
"airlift-json"}
airlift-resolver = { group = "io.airlift.resolver", name = "resolver",
version.ref = "airlift-resolver"}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
index e5454199f..9a6d7b130 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/ITUtils.java
@@ -50,6 +50,7 @@ import org.junit.jupiter.api.Assertions;
public class ITUtils {
public static final String TEST_MODE = "testMode";
public static final String EMBEDDED_TEST_MODE = "embedded";
+ public static final String DEPLOY_TEST_MODE = "deploy";
public static String joinPath(String... dirs) {
return String.join(File.separator, dirs);
diff --git a/settings.gradle.kts b/settings.gradle.kts
index e98f81d39..36d66504f 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -70,3 +70,4 @@ project(":spark-connector:spark-runtime-3.5").projectDir =
file("spark-connector
include("web:web", "web:integration-test")
include("docs")
include("integration-test-common")
+include(":bundles:gcp-bundle")