This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new af47d3a423 [#9280] improvement(catalogs-fileset): Refactor FileSystem 
retrieval to use `future` and solve hang problem (#9282)
af47d3a423 is described below

commit af47d3a423078762ae29394d23c7e7e2b48c4c3e
Author: Mini Yu <[email protected]>
AuthorDate: Mon Dec 8 11:00:02 2025 +0800

    [#9280] improvement(catalogs-fileset): Refactor FileSystem retrieval to use 
`future` and solve hang problem (#9282)
    
    ### What changes were proposed in this pull request?
    
    Replaced Awaitility-based polling with CompletableFuture for FileSystem
    retrieval using a dedicated ThreadPoolExecutor. This improves
    performance and simplifies timeout handling by leveraging async
    execution and modern concurrency utilities. Added cancellation handling
    and clearer exception propagation for robustness.
    ### Why are the changes needed?
    
    `provider.getFileSystem(path, config)` may hang and will never return a
    value, which will cause the `Awaitility` mechanism does not work as
    expected.
    
    Fix: #9280
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    Test locally and existing tests.
---
 .../gravitino/oss/fs/OSSFileSystemProvider.java    | 32 ++++++++++
 .../gravitino/s3/fs/S3FileSystemProvider.java      | 42 +++++++++++++
 .../gravitino/abs/fs/AzureFileSystemProvider.java  | 23 +++++++
 .../gravitino/gcs/fs/GCSFileSystemProvider.java    | 32 ++++++++++
 .../catalog/fileset/FilesetCatalogOperations.java  | 71 +++++++++++++++++-----
 .../fileset/TestFilesetCatalogOperations.java      | 48 ++++++++++++++-
 .../gravitino/catalog/hadoop/fs/Constants.java     | 31 ++++++++++
 .../catalog/hadoop/fs/HDFSFileSystemProvider.java  | 35 ++++++++++-
 8 files changed, 296 insertions(+), 18 deletions(-)

diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
index e72f3842ea..73196cd354 100644
--- 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
+++ 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java
@@ -18,6 +18,11 @@
  */
 package org.apache.gravitino.oss.fs;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.OSS_ESTABLISH_TIMEOUT_KEY;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.OSS_MAX_ERROR_RETRIES_KEY;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -63,6 +68,8 @@ public class OSSFileSystemProvider implements 
FileSystemProvider, SupportsCreden
       hadoopConfMap.put(OSS_FILESYSTEM_IMPL, 
AliyunOSSFileSystem.class.getCanonicalName());
     }
 
+    hadoopConfMap = additionalOSSConfig(hadoopConfMap);
+
     Configuration configuration = 
FileSystemUtils.createConfiguration(hadoopConfMap);
 
     return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
@@ -89,4 +96,29 @@ public class OSSFileSystemProvider implements 
FileSystemProvider, SupportsCreden
   public String name() {
     return "oss";
   }
+
+  /**
+   * Add additional OSS configurations for better performance and reliability.
+   *
+   * @param configs Original configurations
+   * @return Configurations with additional OSS settings
+   */
+  private Map<String, String> additionalOSSConfig(Map<String, String> configs) 
{
+    Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+    // Avoid multiple retries to speed up failure in test cases.
+    // Use hard code instead of Constants.ESTABLISH_TIMEOUT_KEY to avoid 
dependency on a specific
+    // Hadoop version.
+    if (!configs.containsKey(OSS_ESTABLISH_TIMEOUT_KEY)) {
+      additionalConfigs.put(OSS_ESTABLISH_TIMEOUT_KEY, 
DEFAULT_CONNECTION_TIMEOUT);
+    }
+
+    if (!configs.containsKey(OSS_MAX_ERROR_RETRIES_KEY)) {
+      additionalConfigs.put(OSS_MAX_ERROR_RETRIES_KEY, DEFAULT_RETRY_LIMIT);
+    }
+
+    // More tuning can be added here.
+
+    return ImmutableMap.copyOf(additionalConfigs);
+  }
 }
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
index 9ef1e00413..4e04216ac7 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java
@@ -19,6 +19,13 @@
 
 package org.apache.gravitino.s3.fs;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_ESTABLISH_TIMEOUT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_MAX_ERROR_RETRIES;
+import static org.apache.gravitino.catalog.hadoop.fs.Constants.S3_RETRY_LIMIT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.S3_RETRY_THROTTLE_LIMIT;
+
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -73,6 +80,8 @@ public class S3FileSystemProvider implements 
FileSystemProvider, SupportsCredent
     // Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
     checkAndSetCredentialProvider(hadoopConfMap);
 
+    hadoopConfMap = additionalS3Config(hadoopConfMap);
+
     Configuration configuration = 
FileSystemUtils.createConfiguration(hadoopConfMap);
     return S3AFileSystem.newInstance(path.toUri(), configuration);
   }
@@ -142,4 +151,37 @@ public class S3FileSystemProvider implements 
FileSystemProvider, SupportsCredent
   public String name() {
     return "s3";
   }
+
+  /**
+   * Add additional S3 configurations to improve performance and reliability.
+   *
+   * @param configs Original configurations
+   * @return Configurations with additional S3 settings
+   */
+  private Map<String, String> additionalS3Config(Map<String, String> configs) {
+    Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+    // Avoid multiple retries to speed up failure in test cases.
+    // Use hard code instead of Constants.MAX_ERROR_RETRIES to avoid 
dependency on a specific Hadoop
+    // version
+    if (!configs.containsKey(S3_MAX_ERROR_RETRIES)) {
+      additionalConfigs.put(S3_MAX_ERROR_RETRIES, DEFAULT_RETRY_LIMIT);
+    }
+
+    if (!configs.containsKey(S3_ESTABLISH_TIMEOUT)) {
+      additionalConfigs.put(S3_ESTABLISH_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT);
+    }
+
+    if (!configs.containsKey(S3_RETRY_LIMIT)) {
+      additionalConfigs.put(S3_RETRY_LIMIT, DEFAULT_RETRY_LIMIT);
+    }
+
+    if (!configs.containsKey(S3_RETRY_THROTTLE_LIMIT)) {
+      additionalConfigs.put(S3_RETRY_THROTTLE_LIMIT, DEFAULT_RETRY_LIMIT);
+    }
+
+    // More tuning can be added here.
+
+    return ImmutableMap.copyOf(additionalConfigs);
+  }
 }
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
index 4b14d31b04..4a107e0bd6 100644
--- 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.abs.fs;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.ADLS_MAX_RETRIES;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
@@ -71,6 +73,8 @@ public class AzureFileSystemProvider implements 
FileSystemProvider, SupportsCred
       hadoopConfMap.put(ABFS_IMPL_KEY, ABFS_IMPL);
     }
 
+    hadoopConfMap = additionalAzureConfig(hadoopConfMap);
+
     Configuration configuration = 
FileSystemUtils.createConfiguration(hadoopConfMap);
     return FileSystem.newInstance(path.toUri(), configuration);
   }
@@ -110,4 +114,23 @@ public class AzureFileSystemProvider implements 
FileSystemProvider, SupportsCred
   public String name() {
     return ABS_PROVIDER_NAME;
   }
+
+  /**
+   * Add additional Azure specific configuration to tune the performance.
+   *
+   * @param configs original configuration
+   * @return the configuration with additional Azure tuning parameters
+   */
+  private Map<String, String> additionalAzureConfig(Map<String, String> 
configs) {
+    Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+    // Avoid multiple retries to speed up failure in test cases.
+    if (!configs.containsKey(ADLS_MAX_RETRIES)) {
+      additionalConfigs.put(ADLS_MAX_RETRIES, DEFAULT_RETRY_LIMIT);
+    }
+
+    // More tuning can be added here.
+
+    return ImmutableMap.copyOf(additionalConfigs);
+  }
 }
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
index 5c37614f4a..41cc0047d6 100644
--- 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
+++ 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java
@@ -18,6 +18,11 @@
  */
 package org.apache.gravitino.gcs.fs;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_RETRY_LIMIT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.GCS_HTTP_MAX_RETRY_KEY;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -47,6 +52,8 @@ public class GCSFileSystemProvider implements 
FileSystemProvider, SupportsCreden
   public FileSystem getFileSystem(Path path, Map<String, String> config) 
throws IOException {
     Map<String, String> hadoopConfMap =
         FileSystemUtils.toHadoopConfigMap(config, 
GRAVITINO_KEY_TO_GCS_HADOOP_KEY);
+    hadoopConfMap = additionalGCSConfig(hadoopConfMap);
+
     Configuration configuration = 
FileSystemUtils.createConfiguration(hadoopConfMap);
     return FileSystem.newInstance(path.toUri(), configuration);
   }
@@ -71,4 +78,29 @@ public class GCSFileSystemProvider implements 
FileSystemProvider, SupportsCreden
   public String name() {
     return "gcs";
   }
+
+  /**
+   * Add additional GCS specific configuration to tune the performance.
+   *
+   * @param configs original configuration
+   * @return the configuration with additional GCS tuning parameters
+   */
+  private Map<String, String> additionalGCSConfig(Map<String, String> configs) 
{
+    Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+    // Avoid multiple retries to speed up failure in test cases.
+    // Use hard code instead of 
GoogleHadoopFileSystemBase.GCS_HTTP_CONNECT_TIMEOUT_KEY to avoid
+    // dependency on a specific Hadoop version
+    if (!configs.containsKey(GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY)) {
+      additionalConfigs.put(GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY, 
DEFAULT_CONNECTION_TIMEOUT);
+    }
+
+    if (!configs.containsKey(GCS_HTTP_MAX_RETRY_KEY)) {
+      additionalConfigs.put(GCS_HTTP_MAX_RETRY_KEY, DEFAULT_RETRY_LIMIT);
+    }
+
+    // More tuning can be added here.
+
+    return ImmutableMap.copyOf(additionalConfigs);
+  }
 }
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index 8b3f2cb140..a6a6d78a64 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -46,8 +46,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -99,8 +104,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +140,23 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
 
+  private final ThreadPoolExecutor fileSystemExecutor =
+      new ThreadPoolExecutor(
+          Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 
16)),
+          Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() * 2, 
32)),
+          5L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(1000),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("fileset-filesystem-getter-pool-%d")
+              .build(),
+          new ThreadPoolExecutor.AbortPolicy()) {
+        {
+          allowCoreThreadTimeOut(true);
+        }
+      };
+
   FilesetCatalogOperations(EntityStore store) {
     this.store = store;
   }
@@ -994,6 +1014,10 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
       scheduler.shutdownNow();
     }
 
+    if (!fileSystemExecutor.isShutdown()) {
+      fileSystemExecutor.shutdownNow();
+    }
+
     if (fileSystemCache != null) {
       fileSystemCache
           .asMap()
@@ -1395,30 +1419,47 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
                 .catalogPropertiesMetadata()
                 .getOrDefault(
                     config, 
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+    Future<FileSystem> fileSystemFuture =
+        fileSystemExecutor.submit(() -> provider.getFileSystem(path, config));
+
     try {
-      AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
-      Awaitility.await()
-          .atMost(timeoutSeconds, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.MILLISECONDS)
-          .until(
-              () -> {
-                fileSystem.set(provider.getFileSystem(path, config));
-                return true;
-              });
-      return fileSystem.get();
-    } catch (ConditionTimeoutException e) {
+      return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      fileSystemFuture.cancel(true);
+
+      LOG.warn(
+          "Timeout when getting FileSystem for path: {}, scheme: {}, provider: 
{} within {} seconds",
+          path,
+          scheme,
+          provider,
+          timeoutSeconds,
+          e);
+
       throw new IOException(
           String.format(
-              "Failed to get FileSystem for path: %s, scheme: %s, provider: 
%s, config: %s within %s "
+              "Failed to get FileSystem for path: %s, scheme: %s, provider: %s 
within %s "
                   + "seconds, please check the configuration or increase the "
                   + "file system connection timeout time by setting catalog 
property: %s",
               path,
               scheme,
               provider,
-              config,
               timeoutSeconds,
               
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
           e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn(
+          "Interrupted when getting FileSystem for path: {}, possibly the 
server is"
+              + " shutting down or catalog is been dropped",
+          path);
+      throw new RuntimeException("Interrupted when getting FileSystem for 
path: " + path, e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+      throw new IOException("Failed to create FileSystem", cause);
     }
   }
 }
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index 6951ced7d6..a55b519f62 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -41,6 +41,7 @@ import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITIN
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static 
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -80,7 +81,9 @@ import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
+import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.PropertiesMetadata;
@@ -106,16 +109,19 @@ import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class TestFilesetCatalogOperations {
 
@@ -1455,9 +1461,9 @@ public class TestFilesetCatalogOperations {
       String subPath = "/test/test.parquet";
       when(mockOps.getFileLocation(filesetIdent, 
subPath)).thenCallRealMethod();
       when(mockOps.getFileLocation(filesetIdent, subPath, 
null)).thenCallRealMethod();
-      when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
+      when(mockOps.getFileSystem(any(), any()))
           .thenReturn(FileSystem.getLocal(new Configuration()));
-      when(mockOps.getFileSystemWithCache(Mockito.any(), 
Mockito.any())).thenCallRealMethod();
+      when(mockOps.getFileSystemWithCache(any(), any())).thenCallRealMethod();
       String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
       Assertions.assertEquals(
           String.format("%s%s", mockFileset.storageLocation(), 
subPath.substring(1)), fileLocation);
@@ -1868,6 +1874,44 @@ public class TestFilesetCatalogOperations {
     }
   }
 
+  @Test
+  @Timeout(20)
+  void testGetFileSystemTimeoutThrowsException() throws Exception {
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "entityStore", new 
RelationalEntityStore(), true);
+
+    try (FilesetCatalogOperations filesetCatalogOperations = new 
FilesetCatalogOperations()) {
+      LocalFileSystemProvider localFileSystemProvider = 
Mockito.mock(LocalFileSystemProvider.class);
+      when(localFileSystemProvider.scheme()).thenReturn("file");
+      when(localFileSystemProvider.getFileSystem(Mockito.any(Path.class), 
Mockito.anyMap()))
+          .thenAnswer(
+              invocation -> {
+                // Block 100s, however, the timeout is set to 6s by default in
+                // FilesetCatalogOperations, so it's expected to be over 
within 10s
+                Awaitility.await().forever().until(() -> false);
+                return new LocalFileSystem();
+              });
+      Map<String, FileSystemProvider> fileSystemProviderMapOriginal = new 
HashMap<>();
+      fileSystemProviderMapOriginal.put("file", localFileSystemProvider);
+      FieldUtils.writeField(
+          filesetCatalogOperations, "fileSystemProvidersMap", 
fileSystemProviderMapOriginal, true);
+
+      FieldUtils.writeField(
+          filesetCatalogOperations, "propertiesMetadata", 
FILESET_PROPERTIES_METADATA, true);
+
+      // We use Annotation `Timeout` to make sure the test will not run 
forever as `getFileSystem`
+      // will throw IOException after timeout(6s)
+      Exception e =
+          Assertions.assertThrows(
+              IOException.class,
+              () ->
+                  filesetCatalogOperations.getFileSystem(
+                      new Path("file:///tmp"), ImmutableMap.of()));
+
+      Assertions.assertTrue(e.getMessage().contains("Failed to get FileSystem 
for path"));
+    }
+  }
+
   private static Stream<Arguments> multipleLocationsArguments() {
     return Stream.of(
         // Honor the catalog location
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
index bd931a5dc9..39ef923b39 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
@@ -46,4 +46,35 @@ public class Constants {
   public static final String AUTH_KERBEROS = "kerberos";
   // Simple authentication type
   public static final String AUTH_SIMPLE = "simple";
+
+  // The following parts are configuration keys for different file systems.
+
+  // S3 specific configuration keys, in case of specific hadoop version 
binding, we will
+  // map these keys to the corresponding hadoop s3 keys.
+  public static final String S3_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+
+  public static final String S3_ESTABLISH_TIMEOUT = 
"fs.s3a.connection.establish.timeout";
+
+  public static final String S3_RETRY_LIMIT = "fs.s3a.retry.limit";
+
+  public static final String S3_RETRY_THROTTLE_LIMIT = 
"fs.s3a.retry.throttle.limit";
+
+  // GCS specific configuration keys
+  public static final String GCS_GCS_HTTP_CONNECT_TIMEOUT_KEY = 
"fs.gs.http.connect-timeout";
+  public static final String GCS_HTTP_MAX_RETRY_KEY = "fs.gs.http.max.retry";
+
+  // OSS specific configuration keys
+  public static final String OSS_ESTABLISH_TIMEOUT_KEY = 
"fs.oss.connection.establish.timeout";
+  public static final String OSS_MAX_ERROR_RETRIES_KEY = 
"fs.oss.attempts.maximum";
+
+  // Azure Blob Storage specific configuration keys, please see: 
AbfsConfiguration
+  public static final String ADLS_MAX_RETRIES = 
"fs.azure.io.retry.max.retries";
+
+  // HDFS specific configuration keys
+  public static final String HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY = 
"ipc.client.connect.timeout";
+  public static final String HDFS_IPC_PING_KEY = "ipc.client.ping";
+  public static final String DEFAULT_HDFS_IPC_PING = "true";
+
+  public static final String DEFAULT_CONNECTION_TIMEOUT = "5000";
+  public static final String DEFAULT_RETRY_LIMIT = "2";
 }
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
index bf83e75a9f..27fc90d80a 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -19,7 +19,13 @@
 package org.apache.gravitino.catalog.hadoop.fs;
 
 import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.BUILTIN_HDFS_FS_PROVIDER;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_CONNECTION_TIMEOUT;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.DEFAULT_HDFS_IPC_PING;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HDFS_IPC_PING_KEY;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -35,7 +41,9 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
   @Override
   public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, 
String> config)
       throws IOException {
-    Configuration configuration = 
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+    Map<String, String> hadoopConfMap = additionalHDFSConfig(config);
+    Configuration configuration =
+        FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);
     return FileSystem.newInstance(path.toUri(), configuration);
   }
 
@@ -48,4 +56,29 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
   public String name() {
     return BUILTIN_HDFS_FS_PROVIDER;
   }
+
+  /**
+   * Add additional HDFS specific configurations.
+   *
+   * @param configs Original configurations.
+   * @return Configurations with additional HDFS specific configurations.
+   */
+  private Map<String, String> additionalHDFSConfig(Map<String, String> 
configs) {
+    Map<String, String> additionalConfigs = Maps.newHashMap(configs);
+
+    // Avoid multiple retries to speed up failure in test cases.
+    // Use hard code instead of 
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY to
+    // avoid dependency on a specific Hadoop version.
+    if (!configs.containsKey(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY)) {
+      additionalConfigs.put(HDFS_IPC_CLIENT_CONNECT_TIMEOUT_KEY, 
DEFAULT_CONNECTION_TIMEOUT);
+    }
+
+    if (!configs.containsKey(HDFS_IPC_PING_KEY)) {
+      additionalConfigs.put(HDFS_IPC_PING_KEY, DEFAULT_HDFS_IPC_PING);
+    }
+
+    // More tuning can be added here.
+
+    return ImmutableMap.copyOf(additionalConfigs);
+  }
 }

Reply via email to