This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new b06b51153 [#5019] feat: (hadoop-catalog): Add a framework to support
multi-storage in a pluggable manner for fileset catalog (#5020)
b06b51153 is described below
commit b06b51153b02ba5194e3a8afedcd4077ef80ce8e
Author: Qi Yu <[email protected]>
AuthorDate: Wed Oct 16 18:07:21 2024 +0800
[#5019] feat: (hadoop-catalog): Add a framework to support multi-storage in
a pluggable manner for fileset catalog (#5020)
### What changes were proposed in this pull request?
Add a framework to support multiple storage system within Hadoop catalog
### Why are the changes needed?
Some users want Gravitino to manage file system like S3 or GCS.
Fix: #5019
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Existing test.
---
.gitignore | 4 +
catalogs/catalog-hadoop/build.gradle.kts | 2 +
.../catalog/hadoop/HadoopCatalogOperations.java | 83 ++++++++++------
.../hadoop/HadoopCatalogPropertiesMetadata.java | 41 +++++++-
.../catalog/hadoop/fs/FileSystemProvider.java | 69 ++++++++++++++
.../catalog/hadoop/fs/FileSystemUtils.java | 104 +++++++++++++++++++++
.../catalog/hadoop/fs/HDFSFileSystemProvider.java | 54 +++++++++++
.../catalog/hadoop/fs/LocalFileSystemProvider.java | 53 +++++++++++
....gravitino.catalog.hadoop.fs.FileSystemProvider | 21 +++++
.../hadoop/TestHadoopCatalogOperations.java | 87 +++++++++++------
clients/filesystem-hadoop3/build.gradle.kts | 9 ++
.../hadoop/GravitinoVirtualFileSystem.java | 39 +++++++-
.../GravitinoVirtualFileSystemConfiguration.java | 10 ++
docs/hadoop-catalog.md | 23 +++--
14 files changed, 531 insertions(+), 68 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7889cf7a9..eae3d3c95 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,7 @@ include clients/client-python/.gitignore
**/metastore_db
**/spark-warehouse
derby.log
+
+web/node_modules
+web/dist
+web/.next
diff --git a/catalogs/catalog-hadoop/build.gradle.kts
b/catalogs/catalog-hadoop/build.gradle.kts
index ba60a161d..940289347 100644
--- a/catalogs/catalog-hadoop/build.gradle.kts
+++ b/catalogs/catalog-hadoop/build.gradle.kts
@@ -36,6 +36,8 @@ dependencies {
exclude(group = "*")
}
+ compileOnly(libs.guava)
+
implementation(libs.hadoop3.common) {
exclude("com.sun.jersey")
exclude("javax.servlet", "servlet-api")
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index da4d0e1a1..8515ea7d2 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
@@ -44,6 +43,8 @@ import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
@@ -71,11 +72,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HadoopCatalogOperations implements CatalogOperations,
SupportsSchemas, FilesetCatalog {
-
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not
exist";
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does
not exist";
private static final String SLASH = "/";
-
private static final Logger LOG =
LoggerFactory.getLogger(HadoopCatalogOperations.class);
private final EntityStore store;
@@ -90,6 +89,10 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
private CatalogInfo catalogInfo;
+ private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
+
+ private FileSystemProvider defaultFileSystemProvider;
+
HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
@@ -107,7 +110,9 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
}
public Configuration getHadoopConf() {
- return hadoopConf;
+ Configuration configuration = new Configuration();
+ conf.forEach((k, v) -> configuration.set(k.replace(CATALOG_BYPASS_PREFIX,
""), v));
+ return configuration;
}
public Map<String, String> getConf() {
@@ -119,26 +124,31 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
throws RuntimeException {
this.propertiesMetadata = propertiesMetadata;
- // Initialize Hadoop Configuration.
- this.conf = config;
- this.hadoopConf = new Configuration();
this.catalogInfo = info;
- Map<String, String> bypassConfigs =
- config.entrySet().stream()
- .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
- .collect(
- Collectors.toMap(
- e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()),
- Map.Entry::getValue));
- bypassConfigs.forEach(hadoopConf::set);
+
+ this.conf = config;
+
+ String fileSystemProviders =
+ (String)
+ propertiesMetadata
+ .catalogPropertiesMetadata()
+ .getOrDefault(config,
HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS);
+
this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+
+ String defaultFileSystemProviderName =
+ (String)
+ propertiesMetadata
+ .catalogPropertiesMetadata()
+ .getOrDefault(config,
HadoopCatalogPropertiesMetadata.DEFAULT_FS_PROVIDER);
+ this.defaultFileSystemProvider =
+ FileSystemUtils.getFileSystemProviderByName(
+ fileSystemProvidersMap, defaultFileSystemProviderName);
String catalogLocation =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config,
HadoopCatalogPropertiesMetadata.LOCATION);
- conf.forEach(hadoopConf::set);
-
this.catalogStorageLocation =
StringUtils.isNotBlank(catalogLocation)
? Optional.of(catalogLocation).map(Path::new)
@@ -235,8 +245,9 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
try {
// formalize the path to avoid path without scheme, uri, authority, etc.
- filesetPath = formalizePath(filesetPath, hadoopConf);
- FileSystem fs = filesetPath.getFileSystem(hadoopConf);
+ filesetPath = formalizePath(filesetPath, conf);
+
+ FileSystem fs = getFileSystem(filesetPath, conf);
if (!fs.exists(filesetPath)) {
if (!fs.mkdirs(filesetPath)) {
throw new RuntimeException(
@@ -339,7 +350,7 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
// For managed fileset, we should delete the related files.
if (filesetEntity.filesetType() == Fileset.Type.MANAGED) {
- FileSystem fs = filesetPath.getFileSystem(hadoopConf);
+ FileSystem fs = getFileSystem(filesetPath, conf);
if (fs.exists(filesetPath)) {
if (!fs.delete(filesetPath, true)) {
LOG.warn("Failed to delete fileset {} location {}", ident,
filesetPath);
@@ -459,7 +470,7 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
Path schemaPath = getSchemaPath(ident.name(), properties);
if (schemaPath != null) {
try {
- FileSystem fs = schemaPath.getFileSystem(hadoopConf);
+ FileSystem fs = getFileSystem(schemaPath, conf);
if (!fs.exists(schemaPath)) {
if (!fs.mkdirs(schemaPath)) {
// Fail the operation when failed to create the schema path.
@@ -577,8 +588,7 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
if (schemaPath == null) {
return false;
}
-
- FileSystem fs = schemaPath.getFileSystem(hadoopConf);
+ FileSystem fs = getFileSystem(schemaPath, conf);
// Nothing to delete if the schema path does not exist.
if (!fs.exists(schemaPath)) {
return false;
@@ -717,8 +727,8 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
}
@VisibleForTesting
- static Path formalizePath(Path path, Configuration configuration) throws
IOException {
- FileSystem defaultFs = FileSystem.get(configuration);
+ Path formalizePath(Path path, Map<String, String> configuration) throws
IOException {
+ FileSystem defaultFs = getFileSystem(path, configuration);
return path.makeQualified(defaultFs.getUri(),
defaultFs.getWorkingDirectory());
}
@@ -731,7 +741,7 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
private boolean checkSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
- return
locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
+ return getFileSystem(locationPath,
conf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in
`FileSystem.isFile(Path f)`.
return false;
@@ -742,4 +752,25 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
fileset.name());
}
}
+
+ FileSystem getFileSystem(Path path, Map<String, String> config) throws
IOException {
+ if (path == null) {
+ throw new IllegalArgumentException("Path should not be null");
+ }
+
+ String scheme =
+ path.toUri().getScheme() != null
+ ? path.toUri().getScheme()
+ : defaultFileSystemProvider.scheme();
+
+ FileSystemProvider provider = fileSystemProvidersMap.get(scheme);
+ if (provider == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported scheme: %s, path: %s, all supported schemes: %s and
providers: %s",
+ scheme, path, fileSystemProvidersMap.keySet(),
fileSystemProvidersMap.values()));
+ }
+
+ return provider.getFileSystem(path, config);
+ }
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
index 9a68e2d55..397e13aa4 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java
@@ -18,10 +18,13 @@
*/
package org.apache.gravitino.catalog.hadoop;
+import static
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KERBEROS_PROPERTY_ENTRIES;
+
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
-import
org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
@@ -34,6 +37,24 @@ public class HadoopCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetada
// If not, users have to specify the storage location in the Schema or
Fileset level.
public static final String LOCATION = "location";
+ /**
+ * The name of {@link FileSystemProvider} to be added to the catalog. Except
built-in
+ * FileSystemProvider like LocalFileSystemProvider and
HDFSFileSystemProvider, users can add their
+ * own FileSystemProvider by specifying the provider name here. The value
can be find {@link
+ * FileSystemProvider#name()}.
+ */
+ public static final String FILESYSTEM_PROVIDERS = "filesystem-providers";
+
+ /**
+ * The default file system provider class name, used to create the default
file system. If not
+ * specified, the default file system provider will be {@link
LocalFileSystemProvider#name()}:
+ * 'builtin-local'.
+ */
+ public static final String DEFAULT_FS_PROVIDER =
"default-filesystem-provider";
+
+ public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
+ public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";
+
private static final Map<String, PropertyEntry<?>>
HADOOP_CATALOG_PROPERTY_ENTRIES =
ImmutableMap.<String, PropertyEntry<?>>builder()
.put(
@@ -44,8 +65,24 @@ public class HadoopCatalogPropertiesMetadata extends
BaseCatalogPropertiesMetada
false /* immutable */,
null,
false /* hidden */))
+ .put(
+ FILESYSTEM_PROVIDERS,
+ PropertyEntry.stringOptionalPropertyEntry(
+ FILESYSTEM_PROVIDERS,
+ "The file system provider names, separated by comma",
+ false /* immutable */,
+ null,
+ false /* hidden */))
+ .put(
+ DEFAULT_FS_PROVIDER,
+ PropertyEntry.stringOptionalPropertyEntry(
+ DEFAULT_FS_PROVIDER,
+ "Default file system provider name",
+ false /* immutable */,
+ BUILTIN_LOCAL_FS_PROVIDER, // please see
LocalFileSystemProvider#name()
+ false /* hidden */))
// The following two are about authentication.
- .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
+ .putAll(KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
new file mode 100644
index 000000000..5bee821e5
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * FileSystemProvider is an interface for providing FileSystem instances. It
is used by the
+ * HadoopCatalog to create FileSystem instances for accessing Hadoop
compatible file systems.
+ */
+public interface FileSystemProvider {
+
+ /**
+ * Get the FileSystem instance according to the configuration map and file
path.
+ *
+ * <p>Compared to the {@link FileSystem#get(Configuration)} method, this
method allows the
+ * provider to create a FileSystem instance with a specific configuration
and do further
+ * initialization if needed.
+ *
+ * <p>For example: 1. We can check the endpoint value validity for
S3AFileSystem then do further
+ * actions. 2. We can also change some default behavior of the FileSystem
initialization process
+ * 3. More...
+ *
+ * @param config The configuration for the FileSystem instance.
+ * @param path The path to the file system.
+ * @return The FileSystem instance.
+ * @throws IOException If the FileSystem instance cannot be created.
+ */
+ FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String>
config)
+ throws IOException;
+
+ /**
+ * Scheme of this FileSystem provider. The value is 'file' for
LocalFileSystem, 'hdfs' for HDFS,
+ * etc.
+ *
+ * @return The scheme of this FileSystem provider used.
+ */
+ String scheme();
+
+ /**
+ * Name of this FileSystem provider. The value is 'builtin-local' for
LocalFileSystem,
+ * 'builtin-hdfs' for HDFS, etc.
+ *
+ * @return The name of this FileSystem provider.
+ */
+ String name();
+}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
new file mode 100644
index 000000000..3a959ff37
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Streams;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FileSystemUtils {
+
+ private FileSystemUtils() {}
+
+ public static Map<String, FileSystemProvider> getFileSystemProviders(String
fileSystemProviders) {
+ Map<String, FileSystemProvider> resultMap = Maps.newHashMap();
+ ServiceLoader<FileSystemProvider> allFileSystemProviders =
+ ServiceLoader.load(FileSystemProvider.class);
+
+ Set<String> providersInUses =
+ fileSystemProviders != null
+ ? Arrays.stream(fileSystemProviders.split(","))
+ .map(f -> f.trim().toLowerCase(Locale.ROOT))
+ .collect(java.util.stream.Collectors.toSet())
+ : Sets.newHashSet();
+
+ // Add built-in file system providers to the use list automatically.
+ providersInUses.add(BUILTIN_LOCAL_FS_PROVIDER.toLowerCase(Locale.ROOT));
+ providersInUses.add(BUILTIN_HDFS_FS_PROVIDER.toLowerCase(Locale.ROOT));
+
+ // Only get the file system providers that are in the user list and check
if the scheme is
+ // unique.
+ Streams.stream(allFileSystemProviders.iterator())
+ .filter(
+ fileSystemProvider ->
+
providersInUses.contains(fileSystemProvider.name().toLowerCase(Locale.ROOT)))
+ .forEach(
+ fileSystemProvider -> {
+ if (resultMap.containsKey(fileSystemProvider.scheme())) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "File system provider: '%s' with scheme '%s' already
exists in the use provider list "
+ + "Please make sure the file system provider
scheme is unique.",
+ fileSystemProvider.getClass().getName(),
fileSystemProvider.scheme()));
+ }
+ resultMap.put(fileSystemProvider.scheme(), fileSystemProvider);
+ });
+
+ // If not all file system providers in providersInUses was found, throw an
exception.
+ Set<String> notFoundProviders =
+ Sets.difference(
+ providersInUses,
+ resultMap.values().stream()
+ .map(p -> p.name().toLowerCase(Locale.ROOT))
+ .collect(Collectors.toSet()))
+ .immutableCopy();
+ if (!notFoundProviders.isEmpty()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "File system providers %s not found in the classpath. Please
make sure the file system "
+ + "provider is in the classpath.",
+ notFoundProviders));
+ }
+
+ return resultMap;
+ }
+
+ public static FileSystemProvider getFileSystemProviderByName(
+ Map<String, FileSystemProvider> fileSystemProviders, String
fileSystemProviderName) {
+ return fileSystemProviders.entrySet().stream()
+ .filter(entry ->
entry.getValue().name().equals(fileSystemProviderName))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new UnsupportedOperationException(
+ String.format(
+ "File system provider with name '%s' not found in the
file system provider list.",
+ fileSystemProviderName)));
+ }
+}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
new file mode 100644
index 000000000..7c9ceebdd
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER;
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+public class HDFSFileSystemProvider implements FileSystemProvider {
+
+ @Override
+ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String,
String> config)
+ throws IOException {
+ Configuration configuration = new Configuration();
+ config.forEach(
+ (k, v) -> {
+ configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+ });
+ return DistributedFileSystem.newInstance(path.toUri(), configuration);
+ }
+
+ @Override
+ public String scheme() {
+ return "hdfs";
+ }
+
+ @Override
+ public String name() {
+ return BUILTIN_HDFS_FS_PROVIDER;
+ }
+}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
new file mode 100644
index 000000000..70e44c76f
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hadoop.fs;
+
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER;
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class LocalFileSystemProvider implements FileSystemProvider {
+
+ @Override
+ public FileSystem getFileSystem(Path path, Map<String, String> config)
throws IOException {
+ Configuration configuration = new Configuration();
+ config.forEach(
+ (k, v) -> {
+ configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
+ });
+
+ return LocalFileSystem.newInstance(path.toUri(), configuration);
+ }
+
+ @Override
+ public String scheme() {
+ return "file";
+ }
+
+ @Override
+ public String name() {
+ return BUILTIN_LOCAL_FS_PROVIDER;
+ }
+}
diff --git
a/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
new file mode 100644
index 000000000..93a84744a
--- /dev/null
+++
b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider
+org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider
\ No newline at end of file
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index d32069726..2b89180a8 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -34,7 +34,6 @@ import static
org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META;
-import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
@@ -50,10 +49,12 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Config;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
@@ -65,6 +66,7 @@ import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
+import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -74,6 +76,7 @@ import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.RelationalEntityStore;
import org.apache.gravitino.storage.relational.service.CatalogMetaService;
import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
import org.apache.gravitino.utils.NameIdentifierUtil;
@@ -230,18 +233,10 @@ public class TestHadoopCatalogOperations {
CatalogInfo catalogInfo = randomCatalogInfo();
ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
- Configuration conf = ops.hadoopConf;
+ Configuration conf = ops.getHadoopConf();
String value = conf.get("fs.defaultFS");
Assertions.assertEquals("file:///", value);
- emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS",
"hdfs://localhost:9000");
- ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
- Configuration conf1 = ops.hadoopConf;
- String value1 = conf1.get("fs.defaultFS");
- Assertions.assertEquals("hdfs://localhost:9000", value1);
-
- Assertions.assertFalse(ops.catalogStorageLocation.isPresent());
-
emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION,
"file:///tmp/catalog");
ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
@@ -677,33 +672,68 @@ public class TestHadoopCatalogOperations {
}
@Test
- public void testFormalizePath() throws IOException {
+ public void testFormalizePath() throws IOException, IllegalAccessException {
String[] paths =
- new String[] {
- "tmp/catalog",
- "/tmp/catalog",
- "file:/tmp/catalog",
- "file:///tmp/catalog",
- "hdfs://localhost:9000/tmp/catalog",
- "s3://bucket/tmp/catalog",
- "gs://bucket/tmp/catalog"
- };
+ new String[] {"tmp/catalog", "/tmp/catalog", "file:/tmp/catalog",
"file:///tmp/catalog"};
String[] expected =
new String[] {
"file:" + Paths.get("").toAbsolutePath() + "/tmp/catalog",
"file:/tmp/catalog",
"file:/tmp/catalog",
- "file:/tmp/catalog",
- "hdfs://localhost:9000/tmp/catalog",
- "s3://bucket/tmp/catalog",
- "gs://bucket/tmp/catalog"
+ "file:/tmp/catalog"
+ };
+
+ HasPropertyMetadata hasPropertyMetadata =
+ new HasPropertyMetadata() {
+ @Override
+ public PropertiesMetadata tablePropertiesMetadata() throws
UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public PropertiesMetadata catalogPropertiesMetadata()
+ throws UnsupportedOperationException {
+ return new PropertiesMetadata() {
+ @Override
+ public Map<String, PropertyEntry<?>> propertyEntries() {
+ return new HadoopCatalogPropertiesMetadata().propertyEntries();
+ }
+ };
+ }
+
+ @Override
+ public PropertiesMetadata schemaPropertiesMetadata()
+ throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public PropertiesMetadata filesetPropertiesMetadata()
+ throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public PropertiesMetadata topicPropertiesMetadata() throws
UnsupportedOperationException {
+ return null;
+ }
};
- for (int i = 0; i < paths.length; i++) {
- Path actual = HadoopCatalogOperations.formalizePath(new Path(paths[i]),
new Configuration());
- Assertions.assertEquals(expected[i], actual.toString());
+ try {
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+ try (HadoopCatalogOperations hadoopCatalogOperations = new
HadoopCatalogOperations()) {
+ Map<String, String> map = ImmutableMap.of("default-filesystem",
"file:///");
+ hadoopCatalogOperations.initialize(map, null, hasPropertyMetadata);
+ for (int i = 0; i < paths.length; i++) {
+ Path actual = hadoopCatalogOperations.formalizePath(new
Path(paths[i]), map);
+ Assertions.assertEquals(expected[i], actual.toString());
+ }
+ }
+ } finally {
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", null,
true);
}
}
@@ -877,8 +907,11 @@ public class TestHadoopCatalogOperations {
try (HadoopCatalogOperations mockOps =
Mockito.mock(HadoopCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
+ when(mockOps.getConf()).thenReturn(Maps.newHashMap());
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent,
subPath)).thenCallRealMethod();
+ when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
+ .thenReturn(FileSystem.getLocal(new Configuration()));
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(),
subPath.substring(1)), fileLocation);
diff --git a/clients/filesystem-hadoop3/build.gradle.kts
b/clients/filesystem-hadoop3/build.gradle.kts
index d7905cd3b..aefac5f28 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -26,6 +26,10 @@ plugins {
dependencies {
compileOnly(project(":clients:client-java-runtime", configuration =
"shadow"))
compileOnly(libs.hadoop3.common)
+ implementation(project(":catalogs:catalog-hadoop")) {
+ exclude(group = "*")
+ }
+
implementation(libs.caffeine)
testImplementation(project(":api"))
@@ -71,6 +75,11 @@ tasks.build {
dependsOn("javadoc")
}
+tasks.compileJava {
+ dependsOn(":catalogs:catalog-hadoop:jar")
+ dependsOn(":catalogs:catalog-hadoop:runtimeJars")
+}
+
tasks.test {
val skipITs = project.hasProperty("skipITs")
if (skipITs) {
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index de0eb758e..05e769667 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -18,6 +18,9 @@
*/
package org.apache.gravitino.filesystem.hadoop;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.GVFS_CONFIG_PREFIX;
+
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
@@ -41,6 +44,8 @@ import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.client.KerberosTokenProvider;
@@ -81,6 +86,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private static final Pattern IDENTIFIER_PATTERN =
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
private static final String SLASH = "/";
+ private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
+ private static final String GRAVITINO_BYPASS_PREFIX = "gravitino.bypass.";
@Override
public void initialize(URI name, Configuration configuration) throws
IOException {
@@ -125,6 +132,10 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
initializeClient(configuration);
+ // Register the default local and HDFS FileSystemProvider
+ String fileSystemProviders = configuration.get(FS_FILESYSTEM_PROVIDERS);
+
fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+
this.workingDirectory = new Path(name);
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
@@ -351,7 +362,6 @@ public class GravitinoVirtualFileSystem extends FileSystem {
Preconditions.checkArgument(
filesetCatalog != null, String.format("Loaded fileset catalog: %s is
null.", catalogIdent));
- // set the thread local audit info
Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
@@ -364,7 +374,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
filesetCatalog.getFileLocation(
NameIdentifier.of(identifier.namespace().level(2),
identifier.name()), subPath);
- URI uri = new Path(actualFileLocation).toUri();
+ Path filePath = new Path(actualFileLocation);
+ URI uri = filePath.toUri();
// we cache the fs for the same scheme, so we can reuse it
String scheme = uri.getScheme();
Preconditions.checkArgument(
@@ -374,7 +385,14 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
scheme,
str -> {
try {
- return FileSystem.newInstance(uri, getConf());
+ Map<String, String> maps = getConfigMap(getConf());
+ FileSystemProvider provider =
fileSystemProvidersMap.get(scheme);
+ if (provider == null) {
+ throw new GravitinoRuntimeException(
+ "Unsupported file system scheme: %s for %s.",
+ scheme,
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME);
+ }
+ return provider.getFileSystem(filePath, maps);
} catch (IOException ioe) {
throw new GravitinoRuntimeException(
"Exception occurs when create new FileSystem for actual
uri: %s, msg: %s",
@@ -385,6 +403,21 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
return new FilesetContextPair(new Path(actualFileLocation), fs);
}
+ private Map<String, String> getConfigMap(Configuration configuration) {
+ Map<String, String> maps = Maps.newHashMap();
+ configuration.forEach(
+ entry -> {
+ String key = entry.getKey();
+ if (key.startsWith(GRAVITINO_BYPASS_PREFIX)) {
+ maps.put(key.substring(GRAVITINO_BYPASS_PREFIX.length()),
entry.getValue());
+ } else if (!key.startsWith(GVFS_CONFIG_PREFIX)) {
+ maps.put(key, entry.getValue());
+ }
+ });
+
+ return maps;
+ }
+
private String getSubPathFromVirtualPath(NameIdentifier identifier, String
virtualPathString) {
return
virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)
? virtualPathString.substring(
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index 8076c02c3..cd1ecb92f 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -18,10 +18,13 @@
*/
package org.apache.gravitino.filesystem.hadoop;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+
/** Configuration class for Gravitino Virtual File System. */
class GravitinoVirtualFileSystemConfiguration {
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
public static final String GVFS_SCHEME = "gvfs";
+ public static final String GVFS_CONFIG_PREFIX = "fs.gvfs.";
/** The configuration key for the Gravitino server URI. */
public static final String FS_GRAVITINO_SERVER_URI_KEY =
"fs.gravitino.server.uri";
@@ -32,6 +35,13 @@ class GravitinoVirtualFileSystemConfiguration {
/** The configuration key for the Gravitino client auth type. */
public static final String FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY =
"fs.gravitino.client.authType";
+ /**
+ * File system provider names configuration key. The value is a comma
separated list of file
+ * system provider name which is defined in the service loader. Users can
custom their own file
+ * system by implementing the {@link FileSystemProvider} interface.
+ */
+ public static final String FS_FILESYSTEM_PROVIDERS =
"fs.gvfs.filesystem.providers";
+
public static final String SIMPLE_AUTH_TYPE = "simple";
public static final String OAUTH2_AUTH_TYPE = "oauth2";
public static final String KERBEROS_AUTH_TYPE = "kerberos";
diff --git a/docs/hadoop-catalog.md b/docs/hadoop-catalog.md
index d6706ff3e..d28e6d93b 100644
--- a/docs/hadoop-catalog.md
+++ b/docs/hadoop-catalog.md
@@ -25,16 +25,19 @@ Hadoop 3. If there's any compatibility issue, please create
an [issue](https://g
Besides the [common catalog
properties](./gravitino-server-config.md#gravitino-catalog-properties-configuration),
the Hadoop catalog has the following properties:
-| Property Name | Description
|
Default Value | Required |
Since Version |
-|----------------------------------------------------|------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
-| `location` | The storage location
managed by Hadoop catalog. |
(none) | No |
0.5.0 |
-| `authentication.impersonation-enable` | Whether to enable
impersonation for the Hadoop catalog. |
`false` | No |
0.5.1 |
-| `authentication.type` | The type of
authentication for Hadoop catalog, currently we only support `kerberos`,
`simple`. | `simple` | No
| 0.5.1 |
-| `authentication.kerberos.principal` | The principal of the
Kerberos authentication |
(none) | required if the value of `authentication.type` is Kerberos. |
0.5.1 |
-| `authentication.kerberos.keytab-uri` | The URI of The keytab
for the Kerberos authentication. |
(none) | required if the value of `authentication.type` is Kerberos. |
0.5.1 |
-| `authentication.kerberos.check-interval-sec` | The check interval of
Kerberos credential for Hadoop catalog. | 60
| No | 0.5.1
|
-| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60
| No | 0.5.1
|
-
+| Property Name | Description
| Default Value | Required
| Since Version |
+|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|-------------------------------------------------------------|------------------|
+| `location` | The storage location
managed by Hadoop catalog.
| (none) | No
| 0.5.0 |
+| `filesystem-providers` | The names (split by
comma) of filesystem providers for the Hadoop catalog. Gravitino already
support built-in `builtin-local`(`local file`) and `builtin-hdfs`(`hdfs`). If
users want to support more file system and add it to Gravitino, they custom
more file system by implementing `FileSystemProvider`. | (none) | No
| 0.7.0-incubating |
+| `default-filesystem-provider` | The name default
filesystem providers of this Hadoop catalog if users do not specify the scheme
in the URI. Default value is `builtin-local`
| `builtin-local` | No
| 0.7.0-incubating |
+| `authentication.impersonation-enable` | Whether to enable
impersonation for the Hadoop catalog.
| `false` | No
| 0.5.1 |
+| `authentication.type` | The type of
authentication for Hadoop catalog, currently we only support `kerberos`,
`simple`.
| `simple` |
No | 0.5.1 |
+| `authentication.kerberos.principal` | The principal of the
Kerberos authentication
| (none) | required if the
value of `authentication.type` is Kerberos. | 0.5.1 |
+| `authentication.kerberos.keytab-uri` | The URI of The keytab
for the Kerberos authentication.
| (none) | required if the
value of `authentication.type` is Kerberos. | 0.5.1 |
+| `authentication.kerberos.check-interval-sec` | The check interval of
Kerberos credential for Hadoop catalog.
| 60 | No
| 0.5.1 |
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.
| 60 | No
| 0.5.1 |
+
+For more about `filesystem-providers`, please refer to
`HadoopFileSystemProvider` or `LocalFileSystemProvider` in the source code.
Furthermore, you also need to place the jar of the file system provider into
the `$GRAVITINO_HOME/catalogs/hadoop/libs` directory if it's not in the
classpath.
### Authentication for Hadoop Catalog