Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2570368218


##########
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##########
@@ -32,7 +34,9 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
   @Override
   public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, 
String> config)
       throws IOException {
-    Configuration configuration = 
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+    Map<String, String> hadoopConfMap = additionalHDFSConfig(config);
+    Configuration configuration =
+        FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);

Review Comment:
   `GRAVITINO_BYPASS` is not imported. This code references `GRAVITINO_BYPASS` 
which is defined in the `FileSystemProvider` interface, but there's no static 
import for it. Add `import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;` to 
fix the compilation error.



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+      new ThreadPoolExecutor(
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),

Review Comment:
   The core and maximum pool sizes are identical, making this effectively a 
fixed-size thread pool. However, `allowCoreThreadTimeOut(true)` is set, which 
means core threads can time out. This configuration is confusing because with 
identical core and max sizes, there are no "extra" threads beyond the core. 
Consider either: (1) using `Executors.newFixedThreadPool()` for clarity, or (2) 
setting different core and max sizes if scaling is intended.



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+      new ThreadPoolExecutor(
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          50L,
+          TimeUnit.MILLISECONDS,
+          new ArrayBlockingQueue<>(1000),
+          r -> {
+            Thread t = new Thread(r, "fileset-filesystem-getter-pool");
+            t.setDaemon(true);
+            return t;
+          },
+          new ThreadPoolExecutor.AbortPolicy()) {
+        {
+          allowCoreThreadTimeOut(true);
+        }
+      };
+

Review Comment:
   The static `ThreadPoolExecutor` is never shut down, creating a resource 
leak. Unlike the `scheduler` field which is shut down in the `close()` method, 
this executor will continue running even after `FilesetCatalogOperations` 
instances are closed. Consider making this executor non-static and shutting it 
down in the `close()` method, or if it must be static, implement a shutdown 
hook or singleton lifecycle management.
   ```suggestion
   
     static {
       Runtime.getRuntime().addShutdownHook(new Thread(() -> {
         GET_FILESYSTEM_EXECUTOR.shutdown();
         try {
           if (!GET_FILESYSTEM_EXECUTOR.awaitTermination(10, TimeUnit.SECONDS)) 
{
             GET_FILESYSTEM_EXECUTOR.shutdownNow();
           }
         } catch (InterruptedException e) {
           GET_FILESYSTEM_EXECUTOR.shutdownNow();
           Thread.currentThread().interrupt();
         }
       }, "fileset-filesystem-getter-pool-shutdown-hook"));
     }
   ```



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String> 
config) throws IOExcepti
                 .catalogPropertiesMetadata()
                 .getOrDefault(
                     config, 
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+    Future<FileSystem> fileSystemFuture =
+        GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path, 
config));

Review Comment:
   The new async FileSystem retrieval mechanism with timeout handling lacks 
test coverage. There are no tests verifying: (1) timeout behavior when 
FileSystem creation hangs, (2) proper exception propagation from 
ExecutionException, or (3) cancellation handling. Consider adding tests that 
mock a slow `FileSystemProvider` to verify the timeout mechanism works 
correctly.



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String> 
config) throws IOExcepti
                 .catalogPropertiesMetadata()
                 .getOrDefault(
                     config, 
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+    Future<FileSystem> fileSystemFuture =
+        GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path, 
config));
+
     try {
-      AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
-      Awaitility.await()
-          .atMost(timeoutSeconds, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.MILLISECONDS)
-          .until(
-              () -> {
-                fileSystem.set(provider.getFileSystem(path, config));
-                return true;
-              });
-      return fileSystem.get();
-    } catch (ConditionTimeoutException e) {
+      return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      fileSystemFuture.cancel(true);
+
       throw new IOException(
           String.format(
-              "Failed to get FileSystem for path: %s, scheme: %s, provider: 
%s, config: %s within %s "
+              "Failed to get FileSystem for path: %s, scheme: %s, provider: 
%s, within %s "
                   + "seconds, please check the configuration or increase the "
                   + "file system connection timeout time by setting catalog 
property: %s",
               path,
               scheme,
               provider,
-              config,
               timeoutSeconds,
               
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
           e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while waiting for FileSystem", e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+      throw new IOException("Failed to create FileSystem", cause);

Review Comment:
   The error message "Failed to create FileSystem" lacks context about which 
path and provider failed. Consider including the `path` and `provider` 
information in the error message to match the pattern used in the 
`TimeoutException` handler (line 1432), which provides helpful diagnostic 
information.
   ```suggestion
         throw new IOException(
             String.format("Failed to create FileSystem for path: %s, provider: 
%s", path, provider), cause);
   ```



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String> 
config) throws IOExcepti
                 .catalogPropertiesMetadata()
                 .getOrDefault(
                     config, 
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+    Future<FileSystem> fileSystemFuture =
+        GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path, 
config));
+
     try {
-      AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
-      Awaitility.await()
-          .atMost(timeoutSeconds, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.MILLISECONDS)
-          .until(
-              () -> {
-                fileSystem.set(provider.getFileSystem(path, config));
-                return true;
-              });
-      return fileSystem.get();
-    } catch (ConditionTimeoutException e) {
+      return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      fileSystemFuture.cancel(true);

Review Comment:
   Cancelling the future with `cancel(true)` interrupts the thread but doesn't 
guarantee cleanup of the FileSystem creation. If `provider.getFileSystem()` is 
in a blocking operation that doesn't respond to interruption (like network 
I/O), the thread may hang indefinitely even after cancellation. Consider 
logging a warning about potential thread leaks when timeout occurs.
   ```suggestion
         fileSystemFuture.cancel(true);
         LOG.warn(
             "Timeout occurred while getting FileSystem for path: {}, scheme: 
{}, provider: {} within {} seconds. "
                 + "The underlying operation may not respond to interruption, 
which can lead to thread leaks. "
                 + "Please check the configuration or increase the file system 
connection timeout. "
                 + "If this warning appears frequently, investigate potential 
thread leaks.",
             path, scheme, provider, timeoutSeconds);
   ```



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+      new ThreadPoolExecutor(
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          50L,
+          TimeUnit.MILLISECONDS,
+          new ArrayBlockingQueue<>(1000),

Review Comment:
   The `ArrayBlockingQueue` with capacity 1000 combined with `AbortPolicy` 
means that if all threads are busy and the queue is full, new FileSystem 
requests will be rejected with a `RejectedExecutionException`. This could cause 
unexpected failures under load. Consider using a `CallerRunsPolicy` instead to 
provide backpressure, or document this failure mode if intentional.



##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
   @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
   @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
 
+  private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+      new ThreadPoolExecutor(
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+          50L,
+          TimeUnit.MILLISECONDS,
+          new ArrayBlockingQueue<>(1000),
+          r -> {
+            Thread t = new Thread(r, "fileset-filesystem-getter-pool");
+            t.setDaemon(true);
+            return t;
+          },

Review Comment:
   The thread name "fileset-filesystem-getter-pool" lacks a thread index 
suffix, so all threads in the pool will have the same name. This makes 
debugging difficult. Consider using a `ThreadFactory` with proper naming like 
`new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("fileset-filesystem-getter-pool-%d").build()`
 which is already used elsewhere in this class (line 278-280).
   ```suggestion
             new ThreadFactoryBuilder()
                 .setDaemon(true)
                 .setNameFormat("fileset-filesystem-getter-pool-%d")
                 .build(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to