This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new af47d3a423 [#9280] improvement(catalogs-fileset): Refactor FileSystem
retrieval to use `future` and solve hang problem (#9282)
af47d3a423 is described below
commit af47d3a423078762ae29394d23c7e7e2b48c4c3e
Author: Mini Yu <[email protected]>
AuthorDate: Mon Dec 8 11:00:02 2025 +0800
[#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use
`future` and solve hang problem (#9282)
### What changes were proposed in this pull request?
Replaced Awaitility-based polling with CompletableFuture for FileSystem
retrieval using a dedicated ThreadPoolExecutor. This improves
performance and simplifies timeout handling by leveraging async
execution and modern concurrency utilities. Added cancellation handling
and clearer exception propagation for robustness.
### Why are the changes needed?
`provider.getFileSystem(path, config)` may hang and will never return a
value, which will cause the `Awaitility` mechanism does not work as
expected.
Fix: #9280
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
Test locally and existing tests.
---
.../gravitino/oss/fs/OSSFileSystemProvider.java | 32 ++++++++++
.../gravitino/s3/fs/S3FileSystemProvider.java | 42 +++++++++++++
.../gravitino/abs/fs/AzureFileSystemProvider.java | 23 +++++++
.../gravitino/gcs/fs/GCSFileSystemProvider.java | 32 ++++++++++
.../catalog/fileset/FilesetCatalogOperations.java | 71 +++++++++++++++++-----
.../fileset/TestFilesetCatalogOperations.java | 48 ++++++++++++++-
.../gravitino/catalog/hadoop/fs/Constants.java | 31 ++++++++++
.../catalog/hadoop/fs/HDFSFileSystemProvider.java | 35 ++++++++++-
8 files changed, 296 insertions(+), 18 deletions(-)
diff --git
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
index e72f3842ea..73196cd354 100644
---
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
+++
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
@@ -18,6 +18,11 @@
*/
package org.apache.gravitino.oss.fs;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.OSS_ESTABLISH_TIMEOUT_KEY;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.OSS_MAX_ERROR_RETRIES_KEY;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -63,6 +68,8 @@ public class OSSFileSystemProvider implements
FileSystemProvider, SupportsCreden
hadoopConfMap.put(OSS_FILESYSTEM_IMPL,
AliyunOSSFileSystem.class.getCanonicalName());
}
+ hadoopConfMap = additionalOSSConfig(hadoopConfMap);
+
Configuration configuration =
FileSystemUtils.createConfiguration(hadoopConfMap);
return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
@@ -89,4 +96,29 @@ public class OSSFileSystemProvider implements
FileSystemProvider, SupportsCreden
public String name() {
return "oss";
}
+
+ /**
+ * Add additional OSS configurations for better performance and reliability.
+ *
+ * @param configs Original configurations
+ * @return Configurations with additional OSS settings
+ */
+ private Map<String, String> additionalOSSConfig(Map<String, String> configs)
{
+ Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+ // Avoid multiple retries to speed up failure in test cases.
+ // Use hard code instead of Constants.ESTABLISH_TIMEOUT_KEY to avoid
dependency on a specific
+ // Hadoop version.
+ if (!configs.containsKey(OSS_ESTABLISH_TIMEOUT_KEY)) {
+ additionalConfigs.put(OSS_ESTABLISH_TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT);
+ }
+
+ if (!configs.containsKey(OSS_MAX_ERROR_RETRIES_KEY)) {
+ additionalConfigs.put(OSS_MAX_ERROR_RETRIES_KEY, DEFAULT_RETRY_LIMIT);
+ }
+
+ // More tuning can be added here.
+
+ return ImmutableMap.copyOf(additionalConfigs);
+ }
}
diff --git
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index 9ef1e00413..4e04216ac7 100644
---
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -19,6 +19,13 @@
package org.apache.gravitino.s3.fs;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_ESTABLISH_TIMEOUT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_MAX_ERROR_RETRIES;
+import static org.apache.gravitino.catalog.hadoop.fs.Constants.S3_RETRY_LIMIT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_RETRY_THROTTLE_LIMIT;
+
import com.amazonaws.auth.AWSCredentialsProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -73,6 +80,8 @@ public class S3FileSystemProvider implements
FileSystemProvider, SupportsCredent
// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(hadoopConfMap);
+ hadoopConfMap = additionalS3Config(hadoopConfMap);
+
Configuration configuration =
FileSystemUtils.createConfiguration(hadoopConfMap);
return S3AFileSystem.newInstance(path.toUri(), configuration);
}
@@ -142,4 +151,37 @@ public class S3FileSystemProvider implements
FileSystemProvider, SupportsCredent
public String name() {
return "s3";
}
+
+ /**
+ * Add additional S3 configurations to improve performance and reliability.
+ *
+ * @param configs Original configurations
+ * @return Configurations with additional S3 settings
+ */
+ private Map<String, String> additionalS3Config(Map<String, String> configs) {
+ Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+ // Avoid multiple retries to speed up failure in test cases.
+ // Use hard code instead of Constants.MAX_ERROR_RETRIES to avoid
dependency on a specific Hadoop
+ // version
+ if (!configs.containsKey(S3_MAX_ERROR_RETRIES)) {
+ additionalConfigs.put(S3_MAX_ERROR_RETRIES, DEFAULT_RETRY_LIMIT);
+ }
+
+ if (!configs.containsKey(S3_ESTABLISH_TIMEOUT)) {
+ additionalConfigs.put(S3_ESTABLISH_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT);
+ }
+
+ if (!configs.containsKey(S3_RETRY_LIMIT)) {
+ additionalConfigs.put(S3_RETRY_LIMIT, DEFAULT_RETRY_LIMIT);
+ }
+
+ if (!configs.containsKey(S3_RETRY_THROTTLE_LIMIT)) {
+ additionalConfigs.put(S3_RETRY_THROTTLE_LIMIT, DEFAULT_RETRY_LIMIT);
+ }
+
+ // More tuning can be added here.
+
+ return ImmutableMap.copyOf(additionalConfigs);
+ }
}
diff --git
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
index 4b14d31b04..4a107e0bd6 100644
---
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
+++
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.abs.fs;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.ADLS_MAX_RETRIES;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
@@ -71,6 +73,8 @@ public class AzureFileSystemProvider implements
FileSystemProvider, SupportsCred
hadoopConfMap.put(ABFS_IMPL_KEY, ABFS_IMPL);
}
+ hadoopConfMap = additionalAzureConfig(hadoopConfMap);
+
Configuration configuration =
FileSystemUtils.createConfiguration(hadoopConfMap);
return FileSystem.newInstance(path.toUri(), configuration);
}
@@ -110,4 +114,23 @@ public class AzureFileSystemProvider implements
FileSystemProvider, SupportsCred
public String name() {
return ABS_PROVIDER_NAME;
}
+
+ /**
+ * Add additional Azure specific configuration to tune the performance.
+ *
+ * @param configs original configuration
+ * @return the configuration with additional Azure tuning parameters
+ */
+ private Map<String, String> additionalAzureConfig(Map<String, String>
configs) {
+ Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+ // Avoid multiple retries to speed up failure in test cases.
+ if (!configs.containsKey(ADLS_MAX_RETRIES)) {
+ additionalConfigs.put(ADLS_MAX_RETRIES, DEFAULT_RETRY_LIMIT);
+ }
+
+ // More tuning can be added here.
+
+ return ImmutableMap.copyOf(additionalConfigs);
+ }
}
diff --git
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 5c37614f4a..41cc0047d6 100644
---
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -18,6 +18,11 @@
*/
package org.apache.gravitino.gcs.fs;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.GCS_HTTP_MAX_RETRY_KEY;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -47,6 +52,8 @@ public class GCSFileSystemProvider implements
FileSystemProvider, SupportsCreden
public FileSystem getFileSystem(Path path, Map<String, String> config)
throws IOException {
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config,
GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+ hadoopConfMap = additionalGCSConfig(hadoopConfMap);
+
Configuration configuration =
FileSystemUtils.createConfiguration(hadoopConfMap);
return FileSystem.newInstance(path.toUri(), configuration);
}
@@ -71,4 +78,29 @@ public class GCSFileSystemProvider implements
FileSystemProvider, SupportsCreden
public String name() {
return "gcs";
}
+
+ /**
+ * Add additional GCS specific configuration to tune the performance.
+ *
+ * @param configs original configuration
+ * @return the configuration with additional GCS tuning parameters
+ */
+ private Map<String, String> additionalGCSConfig(Map<String, String> configs)
{
+ Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+ // Avoid multiple retries to speed up failure in test cases.
+ // Use hard code instead of
GoogleHadoopFileSystemBase.GCS_HTTP_CONNECT_TIMEOUT_KEY to avoid
+ // dependency on a specific Hadoop version
+ if (!configs.containsKey(GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY)) {
+ additionalConfigs.put(GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT);
+ }
+
+ if (!configs.containsKey(GCS_HTTP_MAX_RETRY_KEY)) {
+ additionalConfigs.put(GCS_HTTP_MAX_RETRY_KEY, DEFAULT_RETRY_LIMIT);
+ }
+
+ // More tuning can be added here.
+
+ return ImmutableMap.copyOf(additionalConfigs);
+ }
}
diff --git
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index 8b3f2cb140..a6a6d78a64 100644
---
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -46,8 +46,13 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -99,8 +104,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private final ThreadPoolExecutor fileSystemExecutor =
+ new ThreadPoolExecutor(
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
16)),
+ Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2,
32)),
+ 5L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1000),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("fileset-filesystem-getter-pool-%d")
+ .build(),
+ new ThreadPoolExecutor.AbortPolicy()) {
+ {
+ allowCoreThreadTimeOut(true);
+ }
+ };
+
FilesetCatalogOperations(EntityStore store) {
this.store = store;
}
@@ -994,6 +1014,10 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
scheduler.shutdownNow();
}
+ if (!fileSystemExecutor.isShutdown()) {
+ fileSystemExecutor.shutdownNow();
+ }
+
if (fileSystemCache != null) {
fileSystemCache
.asMap()
@@ -1395,30 +1419,47 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+ Future<FileSystem> fileSystemFuture =
+ fileSystemExecutor.submit(() -> provider.getFileSystem(path, config));
+
try {
- AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
- fileSystem.set(provider.getFileSystem(path, config));
- return true;
- });
- return fileSystem.get();
- } catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
+
+ LOG.warn(
+ "Timeout when getting FileSystem for path: {}, scheme: {}, provider:
{} within {} seconds",
+ path,
+ scheme,
+ provider,
+ timeoutSeconds,
+ e);
+
throw new IOException(
String.format(
- "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, config: %s within %s "
+ "Failed to get FileSystem for path: %s, scheme: %s, provider: %s
within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog
property: %s",
path,
scheme,
provider,
- config,
timeoutSeconds,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn(
+ "Interrupted when getting FileSystem for path: {}, possibly the
server is"
+ + " shutting down or catalog is been dropped",
+ path);
+ throw new RuntimeException("Interrupted when getting FileSystem for
path: " + path, e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException("Failed to create FileSystem", cause);
}
}
}
diff --git
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index 6951ced7d6..a55b519f62 100644
---
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -41,6 +41,7 @@ import static
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITIN
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
import static
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@@ -80,7 +81,9 @@ import org.apache.gravitino.UserPrincipal;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
@@ -106,16 +109,19 @@ import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
public class TestFilesetCatalogOperations {
@@ -1455,9 +1461,9 @@ public class TestFilesetCatalogOperations {
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent,
subPath)).thenCallRealMethod();
when(mockOps.getFileLocation(filesetIdent, subPath,
null)).thenCallRealMethod();
- when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
+ when(mockOps.getFileSystem(any(), any()))
.thenReturn(FileSystem.getLocal(new Configuration()));
- when(mockOps.getFileSystemWithCache(Mockito.any(),
Mockito.any())).thenCallRealMethod();
+ when(mockOps.getFileSystemWithCache(any(), any())).thenCallRealMethod();
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(),
subPath.substring(1)), fileLocation);
@@ -1868,6 +1874,44 @@ public class TestFilesetCatalogOperations {
}
}
+ @Test
+ @Timeout(20)
+ void testGetFileSystemTimeoutThrowsException() throws Exception {
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "entityStore", new
RelationalEntityStore(), true);
+
+ try (FilesetCatalogOperations filesetCatalogOperations = new
FilesetCatalogOperations()) {
+ LocalFileSystemProvider localFileSystemProvider =
Mockito.mock(LocalFileSystemProvider.class);
+ when(localFileSystemProvider.scheme()).thenReturn("file");
+ when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class),
Mockito.anyMap()))
+ .thenAnswer(
+ invocation -> {
+ // Block 100s, however, the timeout is set to 6s by default in
+ // FilesetCatalogOperations, so it's expected to be over
within 10s
+ Awaitility.await().forever().until(() -> false);
+ return new LocalFileSystem();
+ });
+ Map<String, FileSystemProvider> fileSystemProviderMapOriginal = new
HashMap<>();
+ fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+ FieldUtils.writeField(
+ filesetCatalogOperations, "fileSystemProvidersMap",
fileSystemProviderMapOriginal, true);
+
+ FieldUtils.writeField(
+ filesetCatalogOperations, "propertiesMetadata",
FILESET_PROPERTIES_METADATA, true);
+
+ // We use Annotation `Timeout` to make sure the test will not run
forever as `getFileSystem`
+ // will throw IOException after timeout(6s)
+ Exception e =
+ Assertions.assertThrows(
+ IOException.class,
+ () ->
+ filesetCatalogOperations.getFileSystem(
+ new Path("file:///tmp"), ImmutableMap.of()));
+
+ Assertions.assertTrue(e.getMessage().contains("Failed to get FileSystem
for path"));
+ }
+ }
+
private static Stream<Arguments> multipleLocationsArguments() {
return Stream.of(
// Honor the catalog location
diff --git
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
index bd931a5dc9..39ef923b39 100644
---
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
+++
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
@@ -46,4 +46,35 @@ public class Constants {
public static final String AUTH_KERBEROS = "kerberos";
// Simple authentication type
public static final String AUTH_SIMPLE = "simple";
+
+ // The following parts are configuration keys for different file systems.
+
+ // S3 specific configuration keys, in case of specific hadoop version
binding, we will
+ // map these keys to the corresponding hadoop s3 keys.
+ public static final String S3_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+
+ public static final String S3_ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
+
+ public static final String S3_RETRY_LIMIT = "fs.s3a.retry.limit";
+
+ public static final String S3_RETRY_THROTTLE_LIMIT =
"fs.s3a.retry.throttle.limit";
+
+ // GCS specific configuration keys
+ public static final String GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY =
"fs.gs.http.connect-timeout";
+ public static final String GCS_HTTP_MAX_RETRY_KEY = "fs.gs.http.max.retry";
+
+ // OSS specific configuration keys
+ public static final String OSS_ESTABLISH_TIMEOUT_KEY =
"fs.oss.connection.establish.timeout";
+ public static final String OSS_MAX_ERROR_RETRIES_KEY =
"fs.oss.attempts.maximum";
+
+ // Azure Blob Storage specific configuration keys, please see:
AbfsConfiguration
+ public static final String ADLS_MAX_RETRIES =
"fs.azure.io.retry.max.retries";
+
+ // HDFS specific configuration keys
+ public static final String HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY =
"ipc.client.connect.timeout";
+ public static final String HDFS_IPC_PING_KEY = "ipc.client.ping";
+ public static final String DEFAULT_HDFS_IPC_PING = "true";
+
+ public static final String DEFAULT_CONNECTION_TIMEOUT = "5000";
+ public static final String DEFAULT_RETRY_LIMIT = "2";
}
diff --git
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
index bf83e75a9f..27fc90d80a 100644
---
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
+++
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -19,7 +19,13 @@
package org.apache.gravitino.catalog.hadoop.fs;
import static
org.apache.gravitino.catalog.hadoop.fs.Constants.BUILTIN_HDFS_FS_PROVIDER;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_HDFS_IPC_PING;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+import static
org.apache.gravitino.catalog.hadoop.fs.Constants.HDFS_IPC_PING_KEY;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
@@ -35,7 +41,9 @@ public class HDFSFileSystemProvider implements
FileSystemProvider {
@Override
public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String,
String> config)
throws IOException {
- Configuration configuration =
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+ Map<String, String> hadoopConfMap = additionalHDFSConfig(config);
+ Configuration configuration =
+ FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);
return FileSystem.newInstance(path.toUri(), configuration);
}
@@ -48,4 +56,29 @@ public class HDFSFileSystemProvider implements
FileSystemProvider {
public String name() {
return BUILTIN_HDFS_FS_PROVIDER;
}
+
+ /**
+ * Add additional HDFS specific configurations.
+ *
+ * @param configs Original configurations.
+ * @return Configurations with additional HDFS specific configurations.
+ */
+ private Map<String, String> additionalHDFSConfig(Map<String, String>
configs) {
+ Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+ // Avoid multiple retries to speed up failure in test cases.
+ // Use hard code instead of
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY to
+ // avoid dependency on a specific Hadoop version.
+ if (!configs.containsKey(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY)) {
+ additionalConfigs.put(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT);
+ }
+
+ if (!configs.containsKey(HDFS_IPC_PING_KEY)) {
+ additionalConfigs.put(HDFS_IPC_PING_KEY, DEFAULT_HDFS_IPC_PING);
+ }
+
+ // More tuning can be added here.
+
+ return ImmutableMap.copyOf(additionalConfigs);
+ }
}