This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 39ed7e4453 Core: Track & close FileIO used for remote scan planning
(#15439)
39ed7e4453 is described below
commit 39ed7e445361b2ff83887795e7e8d9f92ed45abc
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Mar 3 09:50:17 2026 +0100
Core: Track & close FileIO used for remote scan planning (#15439)
---
.../org/apache/iceberg/rest/RESTTableScan.java | 50 ++++++++++++++++------
1 file changed, 38 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index 460c6896e9..706d83817b 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.rest;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
@@ -58,6 +61,17 @@ class RESTTableScan extends DataTableScan {
private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum
duration (5 minutes)
private static final double SCALE_FACTOR = 2.0; // Exponential scale factor
private static final String DEFAULT_FILE_IO_IMPL =
"org.apache.iceberg.io.ResolvingFileIO";
+ private static final Cache<RESTTableScan, FileIO> FILEIO_TRACKER =
+ Caffeine.newBuilder()
+ .weakKeys()
+ .removalListener(
+ (RemovalListener<RESTTableScan, FileIO>)
+ (scan, io, cause) -> {
+ if (null != io) {
+ io.close();
+ }
+ })
+ .build();
private final RESTClient client;
private final Map<String, String> headers;
@@ -199,16 +213,19 @@ class RESTTableScan extends DataTableScan {
}
private FileIO fileIOForPlanId(List<Credential> storageCredentials) {
- return CatalogUtil.loadFileIO(
- catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL,
DEFAULT_FILE_IO_IMPL),
- ImmutableMap.<String, String>builder()
- .putAll(catalogProperties)
- .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
- .buildKeepingLast(),
- hadoopConf,
- storageCredentials.stream()
- .map(c -> StorageCredential.create(c.prefix(), c.config()))
- .collect(Collectors.toList()));
+ FileIO ioForScan =
+ CatalogUtil.loadFileIO(
+ catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL,
DEFAULT_FILE_IO_IMPL),
+ ImmutableMap.<String, String>builder()
+ .putAll(catalogProperties)
+ .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
+ .buildKeepingLast(),
+ hadoopConf,
+ storageCredentials.stream()
+ .map(c -> StorageCredential.create(c.prefix(), c.config()))
+ .collect(Collectors.toList()));
+ FILEIO_TRACKER.put(this, ioForScan);
+ return ioForScan;
}
private CloseableIterable<FileScanTask> fetchPlanningResult() {
@@ -236,7 +253,7 @@ class RESTTableScan extends DataTableScan {
"Polling for plan {} failed due to: {}",
planId,
e.getException().getMessage());
- cancelPlan();
+ cleanupPlanResources();
})
.build();
@@ -271,7 +288,7 @@ class RESTTableScan extends DataTableScan {
} catch (Exception e) {
// Catch any immediate non-retryable exceptions (e.g., I/O errors, auth
errors)
try {
- cancelPlan();
+ cleanupPlanResources();
} catch (Exception cancelException) {
// Ignore cancellation failures during exception handling
e.addSuppressed(cancelException);
@@ -299,6 +316,15 @@ class RESTTableScan extends DataTableScan {
this::cancelPlan);
}
+ /** Cancels the plan on the server (if supported) and closes the plan-scoped
FileIO */
+ private void cleanupPlanResources() {
+ cancelPlan();
+ if (null != fileIOForPlanId) {
+ FILEIO_TRACKER.invalidate(this);
+ this.fileIOForPlanId = null;
+ }
+ }
+
@VisibleForTesting
@SuppressWarnings("checkstyle:RegexpMultiline")
public boolean cancelPlan() {