Copilot commented on code in PR #9282:
URL: https://github.com/apache/gravitino/pull/9282#discussion_r2570368218
##########
catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java:
##########
@@ -32,7 +34,9 @@ public class HDFSFileSystemProvider implements
FileSystemProvider {
@Override
public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String,
String> config)
throws IOException {
- Configuration configuration =
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, config);
+ Map<String, String> hadoopConfMap = additionalHDFSConfig(config);
+ Configuration configuration =
+ FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, hadoopConfMap);
Review Comment:
`GRAVITINO_BYPASS` is not imported. This code references `GRAVITINO_BYPASS`
which is defined in the `FileSystemProvider` interface, but there's no static
import for it. Add `import static
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;` to
fix the compilation error.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
Review Comment:
The core and maximum pool sizes are identical, making this effectively a
fixed-size thread pool. However, `allowCoreThreadTimeOut(true)` is set, which
means core threads can time out. This configuration is confusing because with
identical core and max sizes, there are no "extra" threads beyond the core.
Consider either: (1) using `Executors.newFixedThreadPool()` for clarity, or (2)
setting different core and max sizes if scaling is intended.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ 50L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1000),
+ r -> {
+ Thread t = new Thread(r, "fileset-filesystem-getter-pool");
+ t.setDaemon(true);
+ return t;
+ },
+ new ThreadPoolExecutor.AbortPolicy()) {
+ {
+ allowCoreThreadTimeOut(true);
+ }
+ };
+
Review Comment:
The static `ThreadPoolExecutor` is never shut down, creating a resource
leak. Unlike the `scheduler` field which is shut down in the `close()` method,
this executor will continue running even after `FilesetCatalogOperations`
instances are closed. Consider making this executor non-static and shutting it
down in the `close()` method, or if it must be static, implement a shutdown
hook or singleton lifecycle management.
```suggestion
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
GET_FILESYSTEM_EXECUTOR.shutdown();
try {
if (!GET_FILESYSTEM_EXECUTOR.awaitTermination(10, TimeUnit.SECONDS))
{
GET_FILESYSTEM_EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
GET_FILESYSTEM_EXECUTOR.shutdownNow();
Thread.currentThread().interrupt();
}
}, "fileset-filesystem-getter-pool-shutdown-hook"));
}
```
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String>
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+ Future<FileSystem> fileSystemFuture =
+ GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path,
config));
Review Comment:
The new async FileSystem retrieval mechanism with timeout handling lacks
test coverage. There are no tests verifying: (1) timeout behavior when
FileSystem creation hangs, (2) proper exception propagation from
ExecutionException, or (3) cancellation handling. Consider adding tests that
mock a slow `FileSystemProvider` to verify the timeout mechanism works
correctly.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String>
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+ Future<FileSystem> fileSystemFuture =
+ GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path,
config));
+
try {
- AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
- fileSystem.set(provider.getFileSystem(path, config));
- return true;
- });
- return fileSystem.get();
- } catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
+
throw new IOException(
String.format(
- "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, config: %s within %s "
+ "Failed to get FileSystem for path: %s, scheme: %s, provider:
%s, within %s "
+ "seconds, please check the configuration or increase the "
+ "file system connection timeout time by setting catalog
property: %s",
path,
scheme,
provider,
- config,
timeoutSeconds,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS),
e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for FileSystem", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException("Failed to create FileSystem", cause);
Review Comment:
The error message "Failed to create FileSystem" lacks context about which
path and provider failed. Consider including the `path` and `provider`
information in the error message to match the pattern used in the
`TimeoutException` handler (line 1432), which provides helpful diagnostic
information.
```suggestion
throw new IOException(
String.format("Failed to create FileSystem for path: %s, provider:
%s", path, provider), cause);
```
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1397,30 +1418,35 @@ FileSystem getFileSystem(Path path, Map<String, String>
config) throws IOExcepti
.catalogPropertiesMetadata()
.getOrDefault(
config,
FilesetCatalogPropertiesMetadata.FILESYSTEM_CONNECTION_TIMEOUT_SECONDS);
+
+ Future<FileSystem> fileSystemFuture =
+ GET_FILESYSTEM_EXECUTOR.submit(() -> provider.getFileSystem(path,
config));
+
try {
- AtomicReference<FileSystem> fileSystem = new AtomicReference<>();
- Awaitility.await()
- .atMost(timeoutSeconds, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.MILLISECONDS)
- .until(
- () -> {
- fileSystem.set(provider.getFileSystem(path, config));
- return true;
- });
- return fileSystem.get();
- } catch (ConditionTimeoutException e) {
+ return fileSystemFuture.get(timeoutSeconds, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fileSystemFuture.cancel(true);
Review Comment:
Cancelling the future with `cancel(true)` interrupts the thread but doesn't
guarantee cleanup of the FileSystem creation. If `provider.getFileSystem()` is
in a blocking operation that doesn't respond to interruption (like network
I/O), the thread may hang indefinitely even after cancellation. Consider
logging a warning about potential thread leaks when timeout occurs.
```suggestion
fileSystemFuture.cancel(true);
LOG.warn(
"Timeout occurred while getting FileSystem for path: {}, scheme:
{}, provider: {} within {} seconds. "
+ "The underlying operation may not respond to interruption,
which can lead to thread leaks. "
+ "Please check the configuration or increase the file system
connection timeout. "
+ "If this warning appears frequently, investigate potential
thread leaks.",
path, scheme, provider, timeoutSeconds);
```
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ 50L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1000),
Review Comment:
The `ArrayBlockingQueue` with capacity 1000 combined with `AbortPolicy`
means that if all threads are busy and the queue is full, new FileSystem
requests will be rejected with a `RejectedExecutionException`. This could cause
unexpected failures under load. Consider using a `CallerRunsPolicy` instead to
provide backpressure, or document this failure mode if intentional.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -138,6 +141,24 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting ScheduledThreadPoolExecutor scheduler;
@VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private static final ThreadPoolExecutor GET_FILESYSTEM_EXECUTOR =
+ new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ Math.max(2, Runtime.getRuntime().availableProcessors() * 2),
+ 50L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1000),
+ r -> {
+ Thread t = new Thread(r, "fileset-filesystem-getter-pool");
+ t.setDaemon(true);
+ return t;
+ },
Review Comment:
The thread name "fileset-filesystem-getter-pool" lacks a thread index
suffix, so all threads in the pool will have the same name. This makes
debugging difficult. Consider using a `ThreadFactory` with proper naming like
`new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("fileset-filesystem-getter-pool-%d").build()`
which is already used elsewhere in this class (line 278-280).
```suggestion
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("fileset-filesystem-getter-pool-%d")
.build(),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]