This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 78447ceca [#5973] feat(hadoop-catalog): Support credential when using
fileset catalog with cloud storage (#5974)
78447ceca is described below
commit 78447cecac8fafbc7f7bde8797a5e821b7375bf0
Author: Qi Yu <[email protected]>
AuthorDate: Fri Jan 10 10:21:36 2025 +0800
[#5973] feat(hadoop-catalog): Support credential when using fileset catalog
with cloud storage (#5974)
### What changes were proposed in this pull request?
Support dynamic credential in obtaining cloud storage fileset.
### Why are the changes needed?
Static key are not very safe, we need to optimize it.
Fix: #5973
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
ITs
---
.../gravitino/oss/fs/OSSCredentialsProvider.java | 88 +++++++++
.../gravitino/oss/fs/OSSFileSystemProvider.java | 20 ++-
.../java/org/apache/gravitino/oss/fs/OSSUtils.java | 52 ++++++
.../gravitino/s3/fs/S3CredentialsProvider.java | 88 +++++++++
.../gravitino/s3/fs/S3FileSystemProvider.java | 30 +++-
.../java/org/apache/gravitino/s3/fs/S3Utils.java | 51 ++++++
bundles/azure/build.gradle.kts | 1 -
.../gravitino/abs/fs/AzureFileSystemProvider.java | 42 ++++-
.../abs/fs/AzureSasCredentialsProvider.java | 87 +++++++++
.../apache/gravitino/abs/fs/AzureStorageUtils.java | 66 +++++++
bundles/gcp/build.gradle.kts | 1 +
.../gravitino/gcs/fs/GCSCredentialsProvider.java | 84 +++++++++
.../gravitino/gcs/fs/GCSFileSystemProvider.java | 19 +-
.../java/org/apache/gravitino/gcs/fs/GCSUtils.java | 42 +++++
catalogs/hadoop-common/build.gradle.kts | 1 +
.../catalog/hadoop/fs/FileSystemUtils.java | 23 +++
.../fs/GravitinoFileSystemCredentialsProvider.java | 38 ++++
.../hadoop/fs/SupportsCredentialVending.java | 37 ++++
...aultGravitinoFileSystemCredentialsProvider.java | 60 +++++++
.../hadoop/GravitinoVirtualFileSystem.java | 197 ++++++++-------------
.../hadoop/GravitinoVirtualFileSystemUtils.java | 151 ++++++++++++++++
.../gravitino/filesystem/hadoop/TestGvfsBase.java | 52 +++++-
.../GravitinoVirtualFileSystemABSCredentialIT.java | 180 +++++++++++++++++++
.../GravitinoVirtualFileSystemGCSCredentialIT.java | 150 ++++++++++++++++
.../GravitinoVirtualFileSystemOSSCredentialIT.java | 168 ++++++++++++++++++
.../GravitinoVirtualFileSystemS3CredentialIT.java | 173 ++++++++++++++++++
26 files changed, 1765 insertions(+), 136 deletions(-)
diff --git
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
new file mode 100644
index 000000000..ef4afe434
--- /dev/null
+++
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.oss.fs;
+
+import com.aliyun.oss.common.auth.BasicCredentials;
+import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.DefaultCredentials;
+import java.net.URI;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class OSSCredentialsProvider implements CredentialsProvider {
+
+ private GravitinoFileSystemCredentialsProvider
gravitinoFileSystemCredentialsProvider;
+ private Credentials basicCredentials;
+ private long expirationTime = Long.MAX_VALUE;
+ private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+ public OSSCredentialsProvider(URI uri, Configuration conf) {
+ this.gravitinoFileSystemCredentialsProvider =
FileSystemUtils.getGvfsCredentialProvider(conf);
+ }
+
+ @Override
+ public void setCredentials(Credentials credentials) {}
+
+ @Override
+ public Credentials getCredentials() {
+ if (basicCredentials == null || System.currentTimeMillis() >=
expirationTime) {
+ synchronized (this) {
+ refresh();
+ }
+ }
+
+ return basicCredentials;
+ }
+
+ private void refresh() {
+ Credential[] gravitinoCredentials =
gravitinoFileSystemCredentialsProvider.getCredentials();
+ Credential credential =
OSSUtils.getSuitableCredential(gravitinoCredentials);
+ if (credential == null) {
+ throw new RuntimeException("No suitable credential for OSS found...");
+ }
+
+ if (credential instanceof OSSSecretKeyCredential) {
+ OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential)
credential;
+ basicCredentials =
+ new DefaultCredentials(
+ ossSecretKeyCredential.accessKeyId(),
ossSecretKeyCredential.secretAccessKey());
+ } else if (credential instanceof OSSTokenCredential) {
+ OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
+ basicCredentials =
+ new BasicCredentials(
+ ossTokenCredential.accessKeyId(),
+ ossTokenCredential.secretAccessKey(),
+ ossTokenCredential.securityToken());
+ }
+
+ if (credential.expireTimeInMs() > 0) {
+ expirationTime =
+ System.currentTimeMillis()
+ + (long)
+ ((credential.expireTimeInMs() - System.currentTimeMillis())
+ * EXPIRATION_TIME_FACTOR);
+ }
+ }
+}
diff --git
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
index b47d25335..358e3a08c 100644
---
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
+++
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
@@ -20,10 +20,15 @@ package org.apache.gravitino.oss.fs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.Constants;
-public class OSSFileSystemProvider implements FileSystemProvider {
+public class OSSFileSystemProvider implements FileSystemProvider,
SupportsCredentialVending {
private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl";
@@ -61,9 +66,22 @@ public class OSSFileSystemProvider implements
FileSystemProvider {
}
hadoopConfMap.forEach(configuration::set);
+
return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}
+ @Override
+ public Map<String, String> getFileSystemCredentialConf(Credential[]
credentials) {
+ Credential credential = OSSUtils.getSuitableCredential(credentials);
+ Map<String, String> result = Maps.newHashMap();
+ if (credential instanceof OSSSecretKeyCredential || credential instanceof
OSSTokenCredential) {
+ result.put(
+ Constants.CREDENTIALS_PROVIDER_KEY,
OSSCredentialsProvider.class.getCanonicalName());
+ }
+
+ return result;
+ }
+
@Override
public String scheme() {
return "oss";
diff --git
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java
new file mode 100644
index 000000000..87c71de37
--- /dev/null
+++ b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.oss.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.OSSSecretKeyCredential;
+import org.apache.gravitino.credential.OSSTokenCredential;
+
+public class OSSUtils {
+
+ /**
+ * Get the credential from the credential array. Using dynamic credential
first, if not found,
+ * uses static credential.
+ *
+ * @param credentials The credential array.
+ * @return A credential. Null if not found.
+ */
+ static Credential getSuitableCredential(Credential[] credentials) {
+ // Use dynamic credential if found.
+ for (Credential credential : credentials) {
+ if (credential instanceof OSSTokenCredential) {
+ return credential;
+ }
+ }
+
+ // If dynamic credential not found, use the static one
+ for (Credential credential : credentials) {
+ if (credential instanceof OSSSecretKeyCredential) {
+ return credential;
+ }
+ }
+
+ return null;
+ }
+}
diff --git
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
new file mode 100644
index 000000000..2fc145889
--- /dev/null
+++
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.s3.fs;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import java.net.URI;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class S3CredentialsProvider implements AWSCredentialsProvider {
+ private GravitinoFileSystemCredentialsProvider
gravitinoFileSystemCredentialsProvider;
+
+ private AWSCredentials basicSessionCredentials;
+ private long expirationTime = Long.MAX_VALUE;
+ private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+ public S3CredentialsProvider(final URI uri, final Configuration conf) {
+ this.gravitinoFileSystemCredentialsProvider =
FileSystemUtils.getGvfsCredentialProvider(conf);
+ }
+
+ @Override
+ public AWSCredentials getCredentials() {
+ // Refresh credentials if they are null or about to expire.
+ if (basicSessionCredentials == null || System.currentTimeMillis() >=
expirationTime) {
+ synchronized (this) {
+ refresh();
+ }
+ }
+
+ return basicSessionCredentials;
+ }
+
+ @Override
+ public void refresh() {
+ Credential[] gravitinoCredentials =
gravitinoFileSystemCredentialsProvider.getCredentials();
+ Credential credential =
S3Utils.getSuitableCredential(gravitinoCredentials);
+
+ if (credential == null) {
+ throw new RuntimeException("No suitable credential for S3 found...");
+ }
+
+ if (credential instanceof S3SecretKeyCredential) {
+ S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential)
credential;
+ basicSessionCredentials =
+ new BasicAWSCredentials(
+ s3SecretKeyCredential.accessKeyId(),
s3SecretKeyCredential.secretAccessKey());
+ } else if (credential instanceof S3TokenCredential) {
+ S3TokenCredential s3TokenCredential = (S3TokenCredential) credential;
+ basicSessionCredentials =
+ new BasicSessionCredentials(
+ s3TokenCredential.accessKeyId(),
+ s3TokenCredential.secretAccessKey(),
+ s3TokenCredential.sessionToken());
+ }
+
+ if (credential.expireTimeInMs() > 0) {
+ expirationTime =
+ System.currentTimeMillis()
+ + (long)
+ ((credential.expireTimeInMs() - System.currentTimeMillis())
+ * EXPIRATION_TIME_FACTOR);
+ }
+ }
+}
diff --git
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index b7cd569bb..cbe133ed7 100644
---
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -25,11 +25,16 @@ import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -39,9 +44,9 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class S3FileSystemProvider implements FileSystemProvider {
+public class S3FileSystemProvider implements FileSystemProvider,
SupportsCredentialVending {
- private static final Logger LOGGER =
LoggerFactory.getLogger(S3FileSystemProvider.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(S3FileSystemProvider.class);
@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
@@ -61,18 +66,29 @@ public class S3FileSystemProvider implements
FileSystemProvider {
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config,
GRAVITINO_KEY_TO_S3_HADOOP_KEY);
+ hadoopConfMap.forEach(configuration::set);
if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
- hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
+ configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
}
- hadoopConfMap.forEach(configuration::set);
-
// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(configuration);
return S3AFileSystem.newInstance(path.toUri(), configuration);
}
+ @Override
+ public Map<String, String> getFileSystemCredentialConf(Credential[]
credentials) {
+ Credential credential = S3Utils.getSuitableCredential(credentials);
+ Map<String, String> result = Maps.newHashMap();
+ if (credential instanceof S3SecretKeyCredential || credential instanceof
S3TokenCredential) {
+ result.put(
+ Constants.AWS_CREDENTIALS_PROVIDER,
S3CredentialsProvider.class.getCanonicalName());
+ }
+
+ return result;
+ }
+
private void checkAndSetCredentialProvider(Configuration configuration) {
String provides = configuration.get(S3_CREDENTIAL_KEY);
if (provides == null) {
@@ -91,12 +107,12 @@ public class S3FileSystemProvider implements
FileSystemProvider {
if (AWSCredentialsProvider.class.isAssignableFrom(c)) {
validProviders.add(provider);
} else {
- LOGGER.warn(
+ LOG.warn(
"Credential provider {} is not a subclass of
AWSCredentialsProvider, skipping",
provider);
}
} catch (Exception e) {
- LOGGER.warn(
+ LOG.warn(
"Credential provider {} not found in the Hadoop runtime, falling
back to default",
provider);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
new file mode 100644
index 000000000..078a1180b
--- /dev/null
+++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.s3.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.S3TokenCredential;
+
+public class S3Utils {
+
+ /**
+ * Get the credential from the credential array. Using dynamic credential
first, if not found,
+ * uses static credential.
+ *
+ * @param credentials The credential array.
+ * @return A credential. Null if not found.
+ */
+ static Credential getSuitableCredential(Credential[] credentials) {
+ // Use dynamic credential if found.
+ for (Credential credential : credentials) {
+ if (credential instanceof S3TokenCredential) {
+ return credential;
+ }
+ }
+
+ // If dynamic credential not found, use the static one
+ for (Credential credential : credentials) {
+ if (credential instanceof S3SecretKeyCredential) {
+ return credential;
+ }
+ }
+ return null;
+ }
+}
diff --git a/bundles/azure/build.gradle.kts b/bundles/azure/build.gradle.kts
index 8dbd6ed48..1cbe4856a 100644
--- a/bundles/azure/build.gradle.kts
+++ b/bundles/azure/build.gradle.kts
@@ -28,7 +28,6 @@ dependencies {
compileOnly(project(":api"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":core"))
-
compileOnly(libs.hadoop3.abs)
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)
diff --git
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
index f89240441..3dcbb502f 100644
---
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
+++
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
@@ -19,19 +19,29 @@
package org.apache.gravitino.abs.fs;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.storage.AzureProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
-public class AzureFileSystemProvider implements FileSystemProvider {
+public class AzureFileSystemProvider implements FileSystemProvider,
SupportsCredentialVending {
@VisibleForTesting public static final String ABS_PROVIDER_SCHEME = "abfss";
@@ -58,13 +68,39 @@ public class AzureFileSystemProvider implements
FileSystemProvider {
config.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY));
}
- if (!config.containsKey(ABFS_IMPL_KEY)) {
+ if (!hadoopConfMap.containsKey(ABFS_IMPL_KEY)) {
configuration.set(ABFS_IMPL_KEY, ABFS_IMPL);
}
hadoopConfMap.forEach(configuration::set);
- return FileSystem.get(path.toUri(), configuration);
+ return FileSystem.newInstance(path.toUri(), configuration);
+ }
+
+ @Override
+ public Map<String, String> getFileSystemCredentialConf(Credential[]
credentials) {
+ Credential credential =
AzureStorageUtils.getSuitableCredential(credentials);
+ Map<String, String> result = Maps.newHashMap();
+ if (credential instanceof ADLSTokenCredential) {
+ ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential)
credential;
+
+ String accountName =
+ String.format("%s.dfs.core.windows.net",
adlsTokenCredential.accountName());
+ result.put(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + "." + accountName,
AuthType.SAS.name());
+ result.put(
+ FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountName,
+ AzureSasCredentialsProvider.class.getName());
+ result.put(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, "true");
+ } else if (credential instanceof AzureAccountKeyCredential) {
+ AzureAccountKeyCredential azureAccountKeyCredential =
(AzureAccountKeyCredential) credential;
+ result.put(
+ String.format(
+ "fs.azure.account.key.%s.dfs.core.windows.net",
+ azureAccountKeyCredential.accountName()),
+ azureAccountKeyCredential.accountKey());
+ }
+
+ return result;
}
@Override
diff --git
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
new file mode 100644
index 000000000..85793d3d9
--- /dev/null
+++
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.abs.fs;
+
+import java.io.IOException;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.Credential;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+
+public class AzureSasCredentialsProvider implements SASTokenProvider,
Configurable {
+
+ private Configuration configuration;
+ private String sasToken;
+
+ private GravitinoFileSystemCredentialsProvider
gravitinoFileSystemCredentialsProvider;
+ private long expirationTime = Long.MAX_VALUE;
+ private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public void initialize(Configuration conf, String accountName) throws
IOException {
+ this.configuration = conf;
+ this.gravitinoFileSystemCredentialsProvider =
FileSystemUtils.getGvfsCredentialProvider(conf);
+ }
+
+ @Override
+ public String getSASToken(String account, String fileSystem, String path,
String operation) {
+ // Refresh credentials if they are null or about to expire.
+ if (sasToken == null || System.currentTimeMillis() >= expirationTime) {
+ synchronized (this) {
+ refresh();
+ }
+ }
+ return sasToken;
+ }
+
+ private void refresh() {
+ Credential[] gravitinoCredentials =
gravitinoFileSystemCredentialsProvider.getCredentials();
+ Credential credential =
AzureStorageUtils.getADLSTokenCredential(gravitinoCredentials);
+ if (credential == null) {
+ throw new RuntimeException("No token credential for OSS found...");
+ }
+
+ if (credential instanceof ADLSTokenCredential) {
+ ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential)
credential;
+ sasToken = adlsTokenCredential.sasToken();
+
+ if (credential.expireTimeInMs() > 0) {
+ expirationTime =
+ System.currentTimeMillis()
+ + (long)
+ ((credential.expireTimeInMs() - System.currentTimeMillis())
+ * EXPIRATION_TIME_FACTOR);
+ }
+ }
+ }
+}
diff --git
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
new file mode 100644
index 000000000..873f61930
--- /dev/null
+++
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.abs.fs;
+
+import org.apache.gravitino.credential.ADLSTokenCredential;
+import org.apache.gravitino.credential.AzureAccountKeyCredential;
+import org.apache.gravitino.credential.Credential;
+
+public class AzureStorageUtils {
+
+ /**
+ * Get the ADLS credential from the credential array. Use the account and
secret if dynamic token
+ * is not found, null if both are not found.
+ *
+ * @param credentials The credential array.
+ * @return A credential. Null if not found.
+ */
+ static Credential getSuitableCredential(Credential[] credentials) {
+ for (Credential credential : credentials) {
+ if (credential instanceof ADLSTokenCredential) {
+ return credential;
+ }
+ }
+
+ for (Credential credential : credentials) {
+ if (credential instanceof AzureAccountKeyCredential) {
+ return credential;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the ADLS token credential from the credential array. Null if not
found.
+ *
+ * @param credentials The credential array.
+ * @return A credential. Null if not found.
+ */
+ static Credential getADLSTokenCredential(Credential[] credentials) {
+ for (Credential credential : credentials) {
+ if (credential instanceof ADLSTokenCredential) {
+ return credential;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/bundles/gcp/build.gradle.kts b/bundles/gcp/build.gradle.kts
index 95907f8a3..7d46fde9e 100644
--- a/bundles/gcp/build.gradle.kts
+++ b/bundles/gcp/build.gradle.kts
@@ -32,6 +32,7 @@ dependencies {
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)
+ compileOnly(libs.hadoop3.gcs)
implementation(project(":catalogs:catalog-common")) {
exclude("*")
diff --git
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
new file mode 100644
index 000000000..c4eefeeeb
--- /dev/null
+++
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.gcs.fs;
+
+import com.google.cloud.hadoop.util.AccessTokenProvider;
+import java.io.IOException;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
+import org.apache.hadoop.conf.Configuration;
+
+public class GCSCredentialsProvider implements AccessTokenProvider {
+ private Configuration configuration;
+ private GravitinoFileSystemCredentialsProvider
gravitinoFileSystemCredentialsProvider;
+
+ private AccessToken accessToken;
+ private long expirationTime = Long.MAX_VALUE;
+ private static final double EXPIRATION_TIME_FACTOR = 0.5D;
+
+ @Override
+ public AccessToken getAccessToken() {
+ if (accessToken == null || System.currentTimeMillis() >= expirationTime) {
+ try {
+ refresh();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to refresh access token", e);
+ }
+ }
+ return accessToken;
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ Credential[] gravitinoCredentials =
gravitinoFileSystemCredentialsProvider.getCredentials();
+
+ Credential credential =
GCSUtils.getGCSTokenCredential(gravitinoCredentials);
+ if (credential == null) {
+ throw new RuntimeException("No suitable credential for OSS found...");
+ }
+
+ if (credential instanceof GCSTokenCredential) {
+ GCSTokenCredential gcsTokenCredential = (GCSTokenCredential) credential;
+ accessToken = new AccessToken(gcsTokenCredential.token(),
credential.expireTimeInMs());
+
+ if (credential.expireTimeInMs() > 0) {
+ expirationTime =
+ System.currentTimeMillis()
+ + (long)
+ ((credential.expireTimeInMs() - System.currentTimeMillis())
+ * EXPIRATION_TIME_FACTOR);
+ }
+ }
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.configuration = configuration;
+ this.gravitinoFileSystemCredentialsProvider =
+ FileSystemUtils.getGvfsCredentialProvider(configuration);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+}
diff --git
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index b79b58ef4..7ab38b2d7 100644
---
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -20,18 +20,23 @@ package org.apache.gravitino.gcs.fs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
import org.apache.gravitino.storage.GCSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-public class GCSFileSystemProvider implements FileSystemProvider {
+public class GCSFileSystemProvider implements FileSystemProvider,
SupportsCredentialVending {
private static final String GCS_SERVICE_ACCOUNT_JSON_FILE =
"fs.gs.auth.service.account.json.keyfile";
+ private static final String GCS_TOKEN_PROVIDER_IMPL =
"fs.gs.auth.access.token.provider.impl";
@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_GCS_HADOOP_KEY =
@@ -43,9 +48,21 @@ public class GCSFileSystemProvider implements
FileSystemProvider {
Configuration configuration = new Configuration();
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY)
.forEach(configuration::set);
+
return FileSystem.newInstance(path.toUri(), configuration);
}
+ @Override
+ public Map<String, String> getFileSystemCredentialConf(Credential[]
credentials) {
+ Credential credential = GCSUtils.getGCSTokenCredential(credentials);
+ Map<String, String> result = Maps.newHashMap();
+ if (credential instanceof GCSTokenCredential) {
+ result.put(GCS_TOKEN_PROVIDER_IMPL,
GCSCredentialsProvider.class.getName());
+ }
+
+ return result;
+ }
+
@Override
public String scheme() {
return "gs";
diff --git
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java
new file mode 100644
index 000000000..f8fbfd635
--- /dev/null
+++ b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.gcs.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.GCSTokenCredential;
+
+public class GCSUtils {
+ /**
+ * Get the credential from the credential array. If the dynamic credential
is not found, return
+ * null.
+ *
+ * @param credentials The credential array.
+ * @return An credential.
+ */
+ static Credential getGCSTokenCredential(Credential[] credentials) {
+ for (Credential credential : credentials) {
+ if (credential instanceof GCSTokenCredential) {
+ return credential;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/catalogs/hadoop-common/build.gradle.kts
b/catalogs/hadoop-common/build.gradle.kts
index 566ce5986..09fd9f801 100644
--- a/catalogs/hadoop-common/build.gradle.kts
+++ b/catalogs/hadoop-common/build.gradle.kts
@@ -23,6 +23,7 @@ plugins {
// try to avoid adding extra dependencies because it is used by catalogs and
connectors.
dependencies {
+ implementation(project(":api"))
implementation(project(":catalogs:catalog-common"))
implementation(libs.commons.lang3)
implementation(libs.hadoop3.client.api)
diff --git
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
index a1434e85c..11ecd1ee9 100644
---
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
+++
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
public class FileSystemUtils {
@@ -160,4 +161,26 @@ public class FileSystemUtils {
return result;
}
+
+ /**
+ * Get the GravitinoFileSystemCredentialProvider from the configuration.
+ *
+ * @param conf Configuration
+ * @return GravitinoFileSystemCredentialProvider
+ */
+ public static GravitinoFileSystemCredentialsProvider
getGvfsCredentialProvider(
+ Configuration conf) {
+ try {
+ GravitinoFileSystemCredentialsProvider
gravitinoFileSystemCredentialsProvider =
+ (GravitinoFileSystemCredentialsProvider)
+ Class.forName(
+
conf.get(GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER))
+ .getDeclaredConstructor()
+ .newInstance();
+ gravitinoFileSystemCredentialsProvider.setConf(conf);
+ return gravitinoFileSystemCredentialsProvider;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create
GravitinoFileSystemCredentialProvider", e);
+ }
+ }
}
diff --git
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
new file mode 100644
index 000000000..40c0492c7
--- /dev/null
+++
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import org.apache.gravitino.credential.Credential;
+import org.apache.hadoop.conf.Configurable;
+
+/** Interface for providing credentials for Gravitino Virtual File System. */
+public interface GravitinoFileSystemCredentialsProvider extends Configurable {
+
+ String GVFS_CREDENTIAL_PROVIDER = "fs.gvfs.credential.provider";
+
+ String GVFS_NAME_IDENTIFIER = "fs.gvfs.name.identifier";
+
+ /**
+ * Get credentials for Gravitino Virtual File System.
+ *
+ * @return credentials for Gravitino Virtual File System
+ */
+ Credential[] getCredentials();
+}
diff --git
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
new file mode 100644
index 000000000..a9c0b688d
--- /dev/null
+++
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.credential.Credential;
+
+/** Interface for file systems that support credential vending. */
+public interface SupportsCredentialVending {
+ /**
+ * Get the configuration needed for the file system credential based on the
credentials.
+ *
+ * @param credentials the credentials to be used for the file system
+ * @return the configuration for the file system credential
+ */
+ default Map<String, String> getFileSystemCredentialConf(Credential[]
credentials) {
+ return ImmutableMap.of();
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
new file mode 100644
index 000000000..2f3278f87
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop;
+
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Default implementation of {@link GravitinoFileSystemCredentialsProvider}
which provides
+ * credentials for Gravitino Virtual File System.
+ */
+public class DefaultGravitinoFileSystemCredentialsProvider
+ implements GravitinoFileSystemCredentialsProvider {
+
+ private Configuration configuration;
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public Credential[] getCredentials() {
+ // The format of name identifier is `metalake.catalog.schema.fileset`
+ String nameIdentifier = configuration.get(GVFS_NAME_IDENTIFIER);
+ String[] idents = nameIdentifier.split("\\.");
+ try (GravitinoClient client =
GravitinoVirtualFileSystemUtils.createClient(configuration)) {
+ FilesetCatalog filesetCatalog =
client.loadCatalog(idents[1]).asFilesetCatalog();
+ Fileset fileset =
filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
+ return fileset.supportsCredentials().getCredentials();
+ }
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index a9c40e558..26d248736 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -23,35 +23,44 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
-import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
+import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.client.KerberosTokenProvider;
+import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
+import org.apache.gravitino.storage.AzureProperties;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -79,7 +88,9 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private String metalakeName;
private Cache<NameIdentifier, FilesetCatalog> catalogCache;
private ScheduledThreadPoolExecutor catalogCleanScheduler;
- private Cache<String, FileSystem> internalFileSystemCache;
+ // Fileset name identifier and its corresponding FileSystem cache, the name
identifier has
+ // four levels, the first level is metalake name.
+ private Cache<NameIdentifier, FileSystem> internalFileSystemCache;
private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler;
// The pattern is used to match gvfs path. The scheme prefix
(gvfs://fileset) is optional.
@@ -91,6 +102,14 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private static final String SLASH = "/";
private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
+ private static final Set<String> CATALOG_NECESSARY_PROPERTIES_TO_KEEP =
+ Sets.newHashSet(
+ OSSProperties.GRAVITINO_OSS_ENDPOINT,
+ OSSProperties.GRAVITINO_OSS_REGION,
+ S3Properties.GRAVITINO_S3_ENDPOINT,
+ S3Properties.GRAVITINO_S3_REGION,
+ AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME);
+
@Override
public void initialize(URI name, Configuration configuration) throws
IOException {
if
(!name.toString().startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX))
{
@@ -132,8 +151,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
"'%s' is not set in the configuration",
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY);
- initializeClient(configuration);
-
+ this.client = GravitinoVirtualFileSystemUtils.createClient(configuration);
// Register the default local and HDFS FileSystemProvider
fileSystemProvidersMap.putAll(getFileSystemProviders());
@@ -145,7 +163,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
}
@VisibleForTesting
- Cache<String, FileSystem> internalFileSystemCache() {
+ Cache<NameIdentifier, FileSystem> internalFileSystemCache() {
return internalFileSystemCache;
}
@@ -193,116 +211,6 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name +
"-%d").build();
}
- private void initializeClient(Configuration configuration) {
- // initialize the Gravitino client
- String serverUri =
-
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(serverUri),
- "'%s' is not set in the configuration",
- GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
-
- String authType =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
- GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
- if
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
{
- this.client =
-
GravitinoClient.builder(serverUri).withMetalake(metalakeName).withSimpleAuth().build();
- } else if (authType.equalsIgnoreCase(
- GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) {
- String authServerUri =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY);
- checkAuthConfig(
- GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY,
- authServerUri);
-
- String credential =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY);
- checkAuthConfig(
- GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY,
- credential);
-
- String path =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY);
- checkAuthConfig(
- GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY,
- path);
-
- String scope =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY);
- checkAuthConfig(
- GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY,
- scope);
-
- DefaultOAuth2TokenProvider authDataProvider =
- DefaultOAuth2TokenProvider.builder()
- .withUri(authServerUri)
- .withCredential(credential)
- .withPath(path)
- .withScope(scope)
- .build();
-
- this.client =
- GravitinoClient.builder(serverUri)
- .withMetalake(metalakeName)
- .withOAuth(authDataProvider)
- .build();
- } else if (authType.equalsIgnoreCase(
- GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) {
- String principal =
- configuration.get(
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY);
- checkAuthConfig(
- GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY,
- principal);
- String keytabFilePath =
- configuration.get(
- GravitinoVirtualFileSystemConfiguration
- .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY);
- KerberosTokenProvider authDataProvider;
- if (StringUtils.isNotBlank(keytabFilePath)) {
- // Using principal and keytab to create auth provider
- authDataProvider =
- KerberosTokenProvider.builder()
- .withClientPrincipal(principal)
- .withKeyTabFile(new File(keytabFilePath))
- .build();
- } else {
- // Using ticket cache to create auth provider
- authDataProvider =
KerberosTokenProvider.builder().withClientPrincipal(principal).build();
- }
- this.client =
- GravitinoClient.builder(serverUri)
- .withMetalake(metalakeName)
- .withKerberosAuth(authDataProvider)
- .build();
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Unsupported authentication type: %s for %s.",
- authType,
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY));
- }
- }
-
- private void checkAuthConfig(String authType, String configKey, String
configValue) {
- Preconditions.checkArgument(
- StringUtils.isNotBlank(configValue),
- "%s should not be null if %s is set to %s.",
- configKey,
-
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
- authType);
- }
-
private String getVirtualLocation(NameIdentifier identifier, boolean
withScheme) {
return String.format(
"%s/%s/%s/%s",
@@ -360,6 +268,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
FilesetCatalog filesetCatalog =
catalogCache.get(
catalogIdent, ident ->
client.loadCatalog(catalogIdent.name()).asFilesetCatalog());
+ Catalog catalog = (Catalog) filesetCatalog;
Preconditions.checkArgument(
filesetCatalog != null, String.format("Loaded fileset catalog: %s is
null.", catalogIdent));
@@ -383,8 +292,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
StringUtils.isNotBlank(scheme), "Scheme of the actual file location
cannot be null.");
FileSystem fs =
internalFileSystemCache.get(
- scheme,
- str -> {
+ identifier,
+ ident -> {
try {
FileSystemProvider provider =
fileSystemProvidersMap.get(scheme);
if (provider == null) {
@@ -398,8 +307,19 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
// https://github.com/apache/gravitino/issues/5609
resetFileSystemServiceLoader(scheme);
- Map<String, String> maps = getConfigMap(getConf());
- return provider.getFileSystem(filePath, maps);
+ Map<String, String> necessaryPropertyFromCatalog =
+ catalog.properties().entrySet().stream()
+ .filter(
+ property ->
+
CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ Map<String, String> totalProperty =
Maps.newHashMap(necessaryPropertyFromCatalog);
+ totalProperty.putAll(getConfigMap(getConf()));
+
+ totalProperty.putAll(getCredentialProperties(provider,
catalog, identifier));
+
+ return provider.getFileSystem(filePath, totalProperty);
} catch (IOException ioe) {
throw new GravitinoRuntimeException(
"Exception occurs when create new FileSystem for actual
uri: %s, msg: %s",
@@ -410,6 +330,41 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
return new FilesetContextPair(new Path(actualFileLocation), fs);
}
+ private Map<String, String> getCredentialProperties(
+ FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier
filesetIdentifier) {
+ // Do not support credential vending, we do not need to add any credential
properties.
+ if (!(fileSystemProvider instanceof SupportsCredentialVending)) {
+ return ImmutableMap.of();
+ }
+
+ ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+ try {
+ Fileset fileset =
+ catalog
+ .asFilesetCatalog()
+ .loadFileset(
+ NameIdentifier.of(
+ filesetIdentifier.namespace().level(2),
filesetIdentifier.name()));
+ Credential[] credentials =
fileset.supportsCredentials().getCredentials();
+ if (credentials.length > 0) {
+ mapBuilder.put(
+ GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER,
+
DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName());
+ mapBuilder.put(
+ GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER,
+ filesetIdentifier.toString());
+
+ SupportsCredentialVending supportsCredentialVending =
+ (SupportsCredentialVending) fileSystemProvider;
+
mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return mapBuilder.build();
+ }
+
private void resetFileSystemServiceLoader(String fsScheme) {
try {
Map<String, Class<? extends FileSystem>> serviceFileSystems =
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
new file mode 100644
index 000000000..8a0d1d874
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.KerberosTokenProvider;
+import org.apache.hadoop.conf.Configuration;
+
+/** Utility class for Gravitino Virtual File System. */
+public class GravitinoVirtualFileSystemUtils {
+
+ /**
+ * Get Gravitino client by the configuration.
+ *
+ * @param configuration The configuration for the Gravitino client.
+ * @return The Gravitino client.
+ */
+ public static GravitinoClient createClient(Configuration configuration) {
+ // initialize the Gravitino client
+ String serverUri =
+
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
+ String metalakeValue =
+
configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(serverUri),
+ "'%s' is not set in the configuration",
+ GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
+
+ String authType =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
+ GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE);
+ if
(authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE))
{
+ return GravitinoClient.builder(serverUri)
+ .withMetalake(metalakeValue)
+ .withSimpleAuth()
+ .build();
+ } else if (authType.equalsIgnoreCase(
+ GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) {
+ String authServerUri =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY);
+ checkAuthConfig(
+ GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY,
+ authServerUri);
+
+ String credential =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY);
+ checkAuthConfig(
+ GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY,
+ credential);
+
+ String path =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY);
+ checkAuthConfig(
+ GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY,
+ path);
+
+ String scope =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY);
+ checkAuthConfig(
+ GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY,
+ scope);
+
+ DefaultOAuth2TokenProvider authDataProvider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(authServerUri)
+ .withCredential(credential)
+ .withPath(path)
+ .withScope(scope)
+ .build();
+
+ return GravitinoClient.builder(serverUri)
+ .withMetalake(metalakeValue)
+ .withOAuth(authDataProvider)
+ .build();
+ } else if (authType.equalsIgnoreCase(
+ GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) {
+ String principal =
+ configuration.get(
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY);
+ checkAuthConfig(
+ GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY,
+ principal);
+ String keytabFilePath =
+ configuration.get(
+ GravitinoVirtualFileSystemConfiguration
+ .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY);
+ KerberosTokenProvider authDataProvider;
+ if (StringUtils.isNotBlank(keytabFilePath)) {
+ // Using principal and keytab to create auth provider
+ authDataProvider =
+ KerberosTokenProvider.builder()
+ .withClientPrincipal(principal)
+ .withKeyTabFile(new File(keytabFilePath))
+ .build();
+ } else {
+ // Using ticket cache to create auth provider
+ authDataProvider =
KerberosTokenProvider.builder().withClientPrincipal(principal).build();
+ }
+
+ return GravitinoClient.builder(serverUri)
+ .withMetalake(metalakeValue)
+ .withKerberosAuth(authDataProvider)
+ .build();
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported authentication type: %s for %s.",
+ authType,
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY));
+ }
+ }
+
+ private static void checkAuthConfig(String authType, String configKey,
String configValue) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(configValue),
+ "%s should not be null if %s is set to %s.",
+ configKey,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY,
+ authType);
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index e7e3b7857..be30e42a4 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -40,7 +41,13 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.credential.CredentialDTO;
+import org.apache.gravitino.dto.file.FilesetDTO;
+import org.apache.gravitino.dto.responses.CredentialResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
+import org.apache.gravitino.dto.responses.FilesetResponse;
+import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -140,6 +147,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString(""));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -149,7 +157,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
Objects.requireNonNull(
((GravitinoVirtualFileSystem) gravitinoFileSystem)
.internalFileSystemCache()
- .getIfPresent("file"));
+ .getIfPresent(
+ NameIdentifier.of(metalakeName, catalogName, schemaName,
"testFSCache")));
String anotherFilesetName = "test_new_fs";
Path diffLocalPath =
@@ -184,6 +193,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
try {
buildMockResource(
Method.GET, locationPath1, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential("fileset1", localPath1.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -199,7 +209,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
0,
((GravitinoVirtualFileSystem)
fs).internalFileSystemCache().asMap().size()));
- assertNull(((GravitinoVirtualFileSystem)
fs).internalFileSystemCache().getIfPresent("file"));
+ assertNull(
+ ((GravitinoVirtualFileSystem) fs)
+ .internalFileSystemCache()
+ .getIfPresent(NameIdentifier.of("file")));
}
}
@@ -224,6 +237,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test.txt");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -276,6 +290,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString("/test.txt"));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath + "/test.txt");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -309,6 +324,32 @@ public class TestGvfsBase extends GravitinoMockServerBase {
}
}
+ private void buildMockResourceForCredential(String filesetName, String
filesetLocation)
+ throws JsonProcessingException {
+ String filesetPath =
+ String.format(
+ "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s",
+ metalakeName, catalogName, schemaName, filesetName);
+ String credentialsPath =
+ String.format(
+ "/api/metalakes/%s/objects/fileset/%s.%s.%s/credentials",
+ metalakeName, catalogName, schemaName, filesetName);
+ FilesetResponse filesetResponse =
+ new FilesetResponse(
+ FilesetDTO.builder()
+ .name(filesetName)
+ .comment("comment")
+ .type(Fileset.Type.MANAGED)
+ .audit(AuditDTO.builder().build())
+ .storageLocation(filesetLocation.toString())
+ .build());
+ CredentialResponse credentialResponse = new CredentialResponse(new
CredentialDTO[] {});
+
+ buildMockResource(Method.GET, filesetPath, ImmutableMap.of(), null,
filesetResponse, SC_OK);
+ buildMockResource(
+ Method.GET, credentialsPath, ImmutableMap.of(), null,
credentialResponse, SC_OK);
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRename(boolean withScheme) throws IOException {
@@ -343,6 +384,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
try {
buildMockResource(
Method.GET, locationPath, queryParams1, null,
fileLocationResponse1, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath +
"/rename_dst2");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -409,6 +451,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString("/test_delete"));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath +
"/test_delete");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -455,6 +498,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString(""));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -499,6 +543,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString(""));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -549,6 +594,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs"));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath +
"/test_mkdirs");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -667,6 +713,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString(""));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -693,6 +740,7 @@ public class TestGvfsBase extends GravitinoMockServerBase {
queryParams.put("sub_path", RESTUtils.encodeString(""));
try {
buildMockResource(Method.GET, locationPath, queryParams, null,
fileLocationResponse, SC_OK);
+ buildMockResourceForCredential(filesetName, localPath.toString());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
new file mode 100644
index 000000000..2f79332e8
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.abs.fs.AzureFileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.AzureProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf("absIsConfigured")
+public class GravitinoVirtualFileSystemABSCredentialIT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GravitinoVirtualFileSystemABSCredentialIT.class);
+
+ public static final String ABS_ACCOUNT_NAME =
System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL");
+ public static final String ABS_ACCOUNT_KEY =
System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL");
+ public static final String ABS_CONTAINER_NAME =
+ System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL");
+ public static final String ABS_TENANT_ID =
System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL");
+ public static final String ABS_CLIENT_ID =
System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL");
+ public static final String ABS_CLIENT_SECRET =
System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL");
+
+ @BeforeAll
+ public void startIntegrationTest() {
+ // Do nothing
+ }
+
+ @BeforeAll
+ public void startUp() throws Exception {
+ // Copy the Azure jars to the gravitino server if in deploy mode.
+ copyBundleJarsToHadoop("azure-bundle");
+ // Need to download jars to gravitino server
+ super.startIntegrationTest();
+
+ // This value can be by tune by the user, please change it accordingly.
+ defaultBlockSize = 32 * 1024 * 1024;
+
+ // This value is 1 for ABS, 3 for GCS, and 1 for S3A.
+ defaultReplication = 1;
+
+ metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ catalogName = GravitinoITUtils.genRandomName("catalog");
+ schemaName = GravitinoITUtils.genRandomName("schema");
+
+ Assertions.assertFalse(client.metalakeExists(metalakeName));
+ metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
+ Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+ Map<String, String> properties = Maps.newHashMap();
+
+ properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
ABS_ACCOUNT_NAME);
+ properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
ABS_ACCOUNT_KEY);
+ properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_ID, ABS_CLIENT_ID);
+ properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_SECRET,
ABS_CLIENT_SECRET);
+ properties.put(AzureProperties.GRAVITINO_AZURE_TENANT_ID, ABS_TENANT_ID);
+ properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "adls-token");
+
+ properties.put(FILESYSTEM_PROVIDERS,
AzureFileSystemProvider.ABS_PROVIDER_NAME);
+
+ Catalog catalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment",
properties);
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+ Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+ conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+ conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+ conf.set("fs.gvfs.impl.disable.cache", "true");
+ conf.set("fs.gravitino.server.uri", serverUri);
+ conf.set("fs.gravitino.client.metalake", metalakeName);
+
+ // Pass this configuration to the real file system
+ conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
ABS_ACCOUNT_NAME);
+ conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
ABS_ACCOUNT_KEY);
+ conf.set("fs.abfss.impl",
"org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");
+
+ conf.set("fs.gravitino.client.useCloudStoreCredential", "true");
+ }
+
+ @AfterAll
+ public void tearDown() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName, true);
+ client.dropMetalake(metalakeName, true);
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Exception in closing CloseableGroup", e);
+ }
+ }
+
+ /**
+ * Remove the `gravitino.bypass` prefix from the configuration and pass it
to the real file system
+ * This method corresponds to the method
org.apache.gravitino.filesystem.hadoop
+ * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original
code.
+ */
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ Configuration absConf = new Configuration();
+ Map<String, String> map = Maps.newHashMap();
+
+ gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+ Map<String, String> hadoopConfMap = FileSystemUtils.toHadoopConfigMap(map,
ImmutableMap.of());
+
+ if (gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME) !=
null
+ && gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY)
!= null) {
+ hadoopConfMap.put(
+ String.format(
+ "fs.azure.account.key.%s.dfs.core.windows.net",
+
gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME)),
+ gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY));
+ }
+
+ hadoopConfMap.forEach(absConf::set);
+
+ return absConf;
+ }
+
+ protected String genStorageLocation(String fileset) {
+ return String.format(
+ "%s://%s@%s.dfs.core.windows.net/%s",
+ AzureFileSystemProvider.ABS_PROVIDER_SCHEME, ABS_CONTAINER_NAME,
ABS_ACCOUNT_NAME, fileset);
+ }
+
+ @Disabled("java.lang.UnsupportedOperationException: Append Support not
enabled")
+ public void testAppend() throws IOException {}
+
+ private static boolean absIsConfigured() {
+ return
StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL"));
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
new file mode 100644
index 000000000..81b352fa5
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.gcs.fs.GCSFileSystemProvider;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.storage.GCSProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "isGCPConfigured", disabledReason = "GCP is not configured")
+public class GravitinoVirtualFileSystemGCSCredentialIT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GravitinoVirtualFileSystemGCSCredentialIT.class);
+
+ public static final String BUCKET_NAME =
System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL");
+ public static final String SERVICE_ACCOUNT_FILE =
+ System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL");
+
+ @BeforeAll
+ public void startIntegrationTest() {
+ // Do nothing
+ }
+
+ @BeforeAll
+ public void startUp() throws Exception {
+ // Copy the GCP jars to the gravitino server if in deploy mode.
+ copyBundleJarsToHadoop("gcp-bundle");
+ // Need to download jars to gravitino server
+ super.startIntegrationTest();
+
+ // This value can be by tune by the user, please change it accordingly.
+ defaultBlockSize = 64 * 1024 * 1024;
+
+ metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ catalogName = GravitinoITUtils.genRandomName("catalog");
+ schemaName = GravitinoITUtils.genRandomName("schema");
+
+ Assertions.assertFalse(client.metalakeExists(metalakeName));
+ metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
+ Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(FILESYSTEM_PROVIDERS, "gcs");
+ properties.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
SERVICE_ACCOUNT_FILE);
+ properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "gcs-token");
+
+ Catalog catalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment",
properties);
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+ Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+ conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+ conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+ conf.set("fs.gvfs.impl.disable.cache", "true");
+ conf.set("fs.gravitino.server.uri", serverUri);
+ conf.set("fs.gravitino.client.metalake", metalakeName);
+
+ // Pass this configuration to the real file system
+ conf.set(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE,
SERVICE_ACCOUNT_FILE);
+ }
+
+ @AfterAll
+ public void tearDown() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName, true);
+ client.dropMetalake(metalakeName, true);
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Exception in closing CloseableGroup", e);
+ }
+ }
+
+ /**
+ * Remove the `gravitino.bypass` prefix from the configuration and pass it
to the real file system
+ * This method corresponds to the method
org.apache.gravitino.filesystem.hadoop
+ * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original
code.
+ */
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ Configuration gcsConf = new Configuration();
+ Map<String, String> map = Maps.newHashMap();
+
+ gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+ Map<String, String> hadoopConfMap =
+ FileSystemUtils.toHadoopConfigMap(
+ map, GCSFileSystemProvider.GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+
+ hadoopConfMap.forEach(gcsConf::set);
+
+ return gcsConf;
+ }
+
+ protected String genStorageLocation(String fileset) {
+ return String.format("gs://%s/dir1/dir2/%s/", BUCKET_NAME, fileset);
+ }
+
+ @Disabled(
+ "GCS does not support append, java.io.IOException: The append operation
is not supported")
+ public void testAppend() throws IOException {}
+
+ private static boolean isGCPConfigured() {
+ return
StringUtils.isNotBlank(System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL"));
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
new file mode 100644
index 000000000..662e8f6e4
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.OSSTokenCredential;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.oss.fs.OSSFileSystemProvider;
+import org.apache.gravitino.storage.OSSProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "ossIsConfigured", disabledReason = "OSS is not prepared")
+public class GravitinoVirtualFileSystemOSSCredentialIT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GravitinoVirtualFileSystemOSSCredentialIT.class);
+
+ public static final String BUCKET_NAME =
System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL");
+ public static final String OSS_ACCESS_KEY =
System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL");
+ public static final String OSS_SECRET_KEY =
System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
+ public static final String OSS_ENDPOINT =
System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL");
+ public static final String OSS_REGION =
System.getenv("OSS_REGION_FOR_CREDENTIAL");
+ public static final String OSS_ROLE_ARN =
System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL");
+
+ @BeforeAll
+ public void startIntegrationTest() {
+ // Do nothing
+ }
+
+ @BeforeAll
+ public void startUp() throws Exception {
+ copyBundleJarsToHadoop("aliyun-bundle");
+ // Need to download jars to gravitino server
+ super.startIntegrationTest();
+
+ // This value can be by tune by the user, please change it accordingly.
+ defaultBlockSize = 64 * 1024 * 1024;
+
+ // The default replication factor is 1.
+ defaultReplication = 1;
+
+ metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ catalogName = GravitinoITUtils.genRandomName("catalog");
+ schemaName = GravitinoITUtils.genRandomName("schema");
+
+ Assertions.assertFalse(client.metalakeExists(metalakeName));
+ metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
+ Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(FILESYSTEM_PROVIDERS, "oss");
+ properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+ properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET,
OSS_SECRET_KEY);
+ properties.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+ properties.put(OSSProperties.GRAVITINO_OSS_REGION, OSS_REGION);
+ properties.put(OSSProperties.GRAVITINO_OSS_ROLE_ARN, OSS_ROLE_ARN);
+ properties.put(
+ CredentialConstants.CREDENTIAL_PROVIDERS,
OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE);
+
+ Catalog catalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment",
properties);
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+ Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+ conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+ conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+ conf.set("fs.gvfs.impl.disable.cache", "true");
+ conf.set("fs.gravitino.server.uri", serverUri);
+ conf.set("fs.gravitino.client.metalake", metalakeName);
+
+ // Pass this configuration to the real file system
+ conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY);
+ conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY);
+ conf.set(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT);
+ conf.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ }
+
+ @AfterAll
+ public void tearDown() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName, true);
+ client.dropMetalake(metalakeName, true);
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Exception in closing CloseableGroup", e);
+ }
+ }
+
+ /**
+ * Remove the `gravitino.bypass` prefix from the configuration and pass it
to the real file system
+ * This method corresponds to the method
org.apache.gravitino.filesystem.hadoop
+ * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original
code.
+ */
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ Configuration ossConf = new Configuration();
+ Map<String, String> map = Maps.newHashMap();
+
+ gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+ Map<String, String> hadoopConfMap =
+ FileSystemUtils.toHadoopConfigMap(
+ map, OSSFileSystemProvider.GRAVITINO_KEY_TO_OSS_HADOOP_KEY);
+
+ hadoopConfMap.forEach(ossConf::set);
+
+ return ossConf;
+ }
+
+ protected String genStorageLocation(String fileset) {
+ return String.format("oss://%s/%s", BUCKET_NAME, fileset);
+ }
+
+ @Disabled(
+ "OSS does not support append, java.io.IOException: The append operation
is not supported")
+ public void testAppend() throws IOException {}
+
+ protected static boolean ossIsConfigured() {
+ return
StringUtils.isNotBlank(System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
+ && StringUtils.isNotBlank(System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL"))
+ && StringUtils.isNotBlank(System.getenv("OSS_REGION_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL"));
+ }
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
new file mode 100644
index 000000000..12d530967
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.s3.fs.S3FileSystemProvider;
+import org.apache.gravitino.storage.S3Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@EnabledIf(value = "s3IsConfigured", disabledReason = "s3 with credential is
not prepared")
+public class GravitinoVirtualFileSystemS3CredentialIT extends
GravitinoVirtualFileSystemIT {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GravitinoVirtualFileSystemS3CredentialIT.class);
+
+ public static final String BUCKET_NAME =
System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL");
+ public static final String S3_ACCESS_KEY =
System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL");
+ public static final String S3_SECRET_KEY =
System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL");
+ public static final String S3_ENDPOINT =
System.getenv("S3_ENDPOINT_FOR_CREDENTIAL");
+ public static final String S3_REGION =
System.getenv("S3_REGION_FOR_CREDENTIAL");
+ public static final String S3_ROLE_ARN =
System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL");
+
+ @BeforeAll
+ public void startIntegrationTest() {
+ // Do nothing
+ }
+
+ @BeforeAll
+ public void startUp() throws Exception {
+ copyBundleJarsToHadoop("aws-bundle");
+
+ // Need to download jars to gravitino server
+ super.startIntegrationTest();
+
+ // This value can be by tune by the user, please change it accordingly.
+ defaultBlockSize = 32 * 1024 * 1024;
+
+ // The value is 1 for S3
+ defaultReplication = 1;
+
+ metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake");
+ catalogName = GravitinoITUtils.genRandomName("catalog");
+ schemaName = GravitinoITUtils.genRandomName("schema");
+
+ Assertions.assertFalse(client.metalakeExists(metalakeName));
+ metalake = client.createMetalake(metalakeName, "metalake comment",
Collections.emptyMap());
+ Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+ properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+ properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+ properties.put(
+ "gravitino.bypass.fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ properties.put(FILESYSTEM_PROVIDERS, "s3");
+
+ properties.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+ properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+ properties.put(
+ CredentialConstants.CREDENTIAL_PROVIDERS,
S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE);
+
+ Catalog catalog =
+ metalake.createCatalog(
+ catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment",
properties);
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+ Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+ conf.set("fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+ conf.set("fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+ conf.set("fs.gvfs.impl.disable.cache", "true");
+ conf.set("fs.gravitino.server.uri", serverUri);
+ conf.set("fs.gravitino.client.metalake", metalakeName);
+
+ // Pass this configuration to the real file system
+ conf.set(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY);
+ conf.set(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY);
+ conf.set(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT);
+ conf.set(S3Properties.GRAVITINO_S3_REGION, S3_REGION);
+ conf.set(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN);
+ }
+
+ @AfterAll
+ public void tearDown() throws IOException {
+ Catalog catalog = metalake.loadCatalog(catalogName);
+ catalog.asSchemas().dropSchema(schemaName, true);
+ metalake.dropCatalog(catalogName, true);
+ client.dropMetalake(metalakeName, true);
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ try {
+ closer.close();
+ } catch (Exception e) {
+ LOG.error("Exception in closing CloseableGroup", e);
+ }
+ }
+
+ /**
+ * Remove the `gravitino.bypass` prefix from the configuration and pass it
to the real file system
+ * This method corresponds to the method
org.apache.gravitino.filesystem.hadoop
+ * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original
code.
+ */
+ protected Configuration
convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) {
+ Configuration s3Conf = new Configuration();
+ Map<String, String> map = Maps.newHashMap();
+
+ gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
+
+ Map<String, String> hadoopConfMap =
+ FileSystemUtils.toHadoopConfigMap(map,
S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY);
+
+ hadoopConfMap.forEach(s3Conf::set);
+
+ return s3Conf;
+ }
+
+ protected String genStorageLocation(String fileset) {
+ return String.format("s3a://%s/%s", BUCKET_NAME, fileset);
+ }
+
+ @Disabled(
+ "GCS does not support append, java.io.IOException: The append operation
is not supported")
+ public void testAppend() throws IOException {}
+
+ protected static boolean s3IsConfigured() {
+ return
StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL"))
+ && StringUtils.isNotBlank(System.getenv("S3_ENDPOINT_FOR_CREDENTIAL"))
+ &&
StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL"))
+ && StringUtils.isNotBlank(System.getenv("S3_REGION_FOR_CREDENTIAL"))
+ && StringUtils.isNotBlank(System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL"));
+ }
+}