This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new cf6f83550a API, Core: Add FileIO to Scan API (#15561)
cf6f83550a is described below

commit cf6f83550afc9081232b437c58730048aad51425
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 13 06:42:50 2026 +0100

    API, Core: Add FileIO to Scan API (#15561)
---
 .../main/java/org/apache/iceberg/BatchScan.java    |   8 ++
 .../java/org/apache/iceberg/BatchScanAdapter.java  |   6 ++
 api/src/main/java/org/apache/iceberg/Scan.java     |   6 ++
 .../src/main/java/org/apache/iceberg/BaseScan.java |   3 +-
 .../java/org/apache/iceberg/rest/RESTTable.java    |   1 -
 .../org/apache/iceberg/rest/RESTTableScan.java     |  46 ++++-----
 .../apache/iceberg/rest/TestRESTScanPlanning.java  | 111 +++++++++++++++++++++
 7 files changed, 154 insertions(+), 27 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/BatchScan.java 
b/api/src/main/java/org/apache/iceberg/BatchScan.java
index 4823d7f180..bd53fe97a3 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScan.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScan.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg;
 
+import org.apache.iceberg.io.FileIO;
+
 /** API for configuring a batch scan. */
 public interface BatchScan extends Scan<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
   /**
@@ -68,4 +70,10 @@ public interface BatchScan extends Scan<BatchScan, ScanTask, 
ScanTaskGroup<ScanT
    * @return the Snapshot this scan will use
    */
   Snapshot snapshot();
+
+  /** Returns the {@link FileIO} instance to use when reading data files for 
this scan. */
+  @Override
+  default FileIO io() {
+    return table().io();
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java 
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index f723e04d0c..cccd1cd162 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.MetricsReporter;
 
 /** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
@@ -38,6 +39,11 @@ public class BatchScanAdapter implements BatchScan {
     return scan.table();
   }
 
+  @Override
+  public FileIO io() {
+    return scan.io();
+  }
+
   @Override
   public BatchScan useSnapshot(long snapshotId) {
     return new BatchScanAdapter(scan.useSnapshot(snapshotId));
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java 
b/api/src/main/java/org/apache/iceberg/Scan.java
index 9785ce6603..4bd7717fd1 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
@@ -208,4 +209,9 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
     throw new UnsupportedOperationException(
         this.getClass().getName() + " doesn't implement minRowsRequested");
   }
+
+  /** Returns the {@link FileIO} instance to use when reading data files for 
this scan. */
+  default FileIO io() {
+    throw new UnsupportedOperationException("io() is not implemented: added in 
1.11.0");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 809eafbd6b..53f3782b38 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -102,7 +102,8 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
     return table;
   }
 
-  protected FileIO io() {
+  @Override
+  public FileIO io() {
     return table.io();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
index 26b694b50c..21d84d8477 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
@@ -75,7 +75,6 @@ class RESTTable extends BaseTable implements 
SupportsDistributedScanPlanning {
         tableIdentifier,
         resourcePaths,
         supportedEndpoints,
-        io(),
         catalogProperties,
         hadoopConf);
   }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index 141efb3ac3..f8860c42d7 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -76,16 +76,14 @@ class RESTTableScan extends DataTableScan {
   private final RESTClient client;
   private final Map<String, String> headers;
   private final TableOperations operations;
-  private final Table table;
   private final ResourcePaths resourcePaths;
   private final TableIdentifier tableIdentifier;
   private final Set<Endpoint> supportedEndpoints;
   private final ParserContext parserContext;
   private final Map<String, String> catalogProperties;
   private final Object hadoopConf;
-  private final FileIO tableIO;
   private String planId = null;
-  private FileIO fileIOForPlanId = null;
+  private FileIO scanFileIO = null;
 
   RESTTableScan(
       Table table,
@@ -97,11 +95,9 @@ class RESTTableScan extends DataTableScan {
       TableIdentifier tableIdentifier,
       ResourcePaths resourcePaths,
       Set<Endpoint> supportedEndpoints,
-      FileIO tableIO,
       Map<String, String> catalogProperties,
       Object hadoopConf) {
     super(table, schema, context);
-    this.table = table;
     this.client = client;
     this.headers = headers;
     this.operations = operations;
@@ -113,7 +109,6 @@ class RESTTableScan extends DataTableScan {
             .add("specsById", table.specs())
             .add("caseSensitive", context().caseSensitive())
             .build();
-    this.tableIO = tableIO;
     this.catalogProperties = catalogProperties;
     this.hadoopConf = hadoopConf;
   }
@@ -131,14 +126,15 @@ class RESTTableScan extends DataTableScan {
         tableIdentifier,
         resourcePaths,
         supportedEndpoints,
-        io(),
         catalogProperties,
         hadoopConf);
   }
 
   @Override
-  protected FileIO io() {
-    return null != fileIOForPlanId ? fileIOForPlanId : tableIO;
+  public FileIO io() {
+    Preconditions.checkState(
+        null != scanFileIO, "FileIO is not available: planFiles() must be 
called first");
+    return scanFileIO;
   }
 
   @Override
@@ -170,7 +166,7 @@ class RESTTableScan extends DataTableScan {
           .withEndSnapshotId(endSnapshotId)
           .withUseSnapshotSchema(true);
     } else if (snapshotId != null) {
-      boolean useSnapShotSchema = snapshotId != 
table.currentSnapshot().snapshotId();
+      boolean useSnapShotSchema = snapshotId != 
table().currentSnapshot().snapshotId();
       
builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema);
     }
 
@@ -190,9 +186,8 @@ class RESTTableScan extends DataTableScan {
 
     this.planId = response.planId();
     PlanStatus planStatus = response.planStatus();
-    if (null != planId && !response.credentials().isEmpty()) {
-      this.fileIOForPlanId = fileIOForPlanId(response.credentials());
-    }
+    this.scanFileIO =
+        !response.credentials().isEmpty() ? scanFileIO(response.credentials()) 
: table().io();
 
     switch (planStatus) {
       case COMPLETED:
@@ -212,14 +207,18 @@ class RESTTableScan extends DataTableScan {
     }
   }
 
-  private FileIO fileIOForPlanId(List<Credential> storageCredentials) {
+  private FileIO scanFileIO(List<Credential> storageCredentials) {
+    ImmutableMap.Builder<String, String> builder =
+        ImmutableMap.<String, String>builder().putAll(catalogProperties);
+    if (null != planId) {
+      builder.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId);
+    }
+
+    Map<String, String> properties = builder.buildKeepingLast();
     FileIO ioForScan =
         CatalogUtil.loadFileIO(
             catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, 
DEFAULT_FILE_IO_IMPL),
-            ImmutableMap.<String, String>builder()
-                .putAll(catalogProperties)
-                .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
-                .buildKeepingLast(),
+            properties,
             hadoopConf,
             storageCredentials.stream()
                 .map(c -> StorageCredential.create(c.prefix(), c.config()))
@@ -275,9 +274,8 @@ class RESTTableScan extends DataTableScan {
           response.planStatus(),
           planId);
 
-      if (!response.credentials().isEmpty()) {
-        this.fileIOForPlanId = fileIOForPlanId(response.credentials());
-      }
+      this.scanFileIO =
+          !response.credentials().isEmpty() ? 
scanFileIO(response.credentials()) : table().io();
 
       return scanTasksIterable(response.planTasks(), response.fileScanTasks());
     } catch (FailsafeException e) {
@@ -319,10 +317,8 @@ class RESTTableScan extends DataTableScan {
   /** Cancels the plan on the server (if supported) and closes the plan-scoped 
FileIO */
   private void cleanupPlanResources() {
     cancelPlan();
-    if (null != fileIOForPlanId) {
-      FILEIO_TRACKER.invalidate(this);
-      this.fileIOForPlanId = null;
-    }
+    FILEIO_TRACKER.invalidate(this);
+    this.scanFileIO = null;
   }
 
   @VisibleForTesting
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index ab0e1d9c56..734eaf485c 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -62,8 +62,12 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -953,4 +957,111 @@ public class TestRESTScanPlanning extends 
TestBaseWithRESTServer {
     // Verify no exception was thrown - cancelPlan returns false when endpoint 
not supported
     assertThat(cancelled).isFalse();
   }
+
+  @ParameterizedTest
+  @EnumSource(PlanningMode.class)
+  void fileIOForRemotePlanningIsPropagated(
+      Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> 
planMode) {
+    RESTCatalogAdapter adapter =
+        Mockito.spy(
+            new RESTCatalogAdapter(backendCatalog) {
+              @Override
+              public <T extends RESTResponse> T execute(
+                  HTTPRequest request,
+                  Class<T> responseType,
+                  Consumer<ErrorResponse> errorHandler,
+                  Consumer<Map<String, String>> responseHeaders,
+                  ParserContext parserContext) {
+                T response =
+                    super.execute(
+                        request, responseType, errorHandler, responseHeaders, 
parserContext);
+                return maybeAddStorageCredential(response);
+              }
+            });
+
+    
adapter.setPlanningBehavior(planMode.apply(TestPlanningBehavior.builder()).build());
+
+    RESTCatalog catalog =
+        new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) 
-> adapter);
+    catalog.initialize(
+        "test",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.inmemory.InMemoryFileIO",
+            RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
+            "true"));
+
+    Table table = restTableFor(catalog, "file_io_propagation");
+
+    
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+    TableScan tableScan = table.newScan();
+    assertThatThrownBy(tableScan::io)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("FileIO is not available: planFiles() must be called 
first");
+
+    // make sure remote scan planning is called and FileIO gets the planId
+    assertThat(tableScan.planFiles()).hasSize(1);
+    
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+    
assertThat(tableScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+    String planId = 
tableScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+    TableScan newScan = table.newScan();
+    assertThatThrownBy(newScan::io)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("FileIO is not available: planFiles() must be called 
first");
+
+    // make sure remote scan planning is called and FileIO gets the planId
+    assertThat(newScan.planFiles()).hasSize(1);
+    
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+    // make sure planIds are different for each scan
+    
assertThat(newScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+    
assertThat(newScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID))
+        .isNotEqualTo(planId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends RESTResponse> T maybeAddStorageCredential(T response) {
+    if (response instanceof PlanTableScanResponse resp
+        && PlanStatus.COMPLETED == resp.planStatus()) {
+      return (T)
+          PlanTableScanResponse.builder()
+              .withPlanStatus(resp.planStatus())
+              .withPlanId(resp.planId())
+              .withPlanTasks(resp.planTasks())
+              .withFileScanTasks(resp.fileScanTasks())
+              .withCredentials(
+                  ImmutableList.<Credential>builder()
+                      .addAll(resp.credentials())
+                      .add(
+                          ImmutableCredential.builder()
+                              .prefix("dummy")
+                              .putConfig("dummyKey", "dummyVal")
+                              .build())
+                      .build())
+              .withSpecsById(resp.specsById())
+              .build();
+    } else if (response instanceof FetchPlanningResultResponse resp
+        && PlanStatus.COMPLETED == resp.planStatus()) {
+      return (T)
+          FetchPlanningResultResponse.builder()
+              .withPlanStatus(resp.planStatus())
+              .withFileScanTasks(resp.fileScanTasks())
+              .withPlanTasks(resp.planTasks())
+              .withSpecsById(resp.specsById())
+              .withCredentials(
+                  ImmutableList.<Credential>builder()
+                      .addAll(resp.credentials())
+                      .add(
+                          ImmutableCredential.builder()
+                              .prefix("dummy")
+                              .putConfig("dummyKey", "dummyVal")
+                              .build())
+                      .build())
+              .build();
+    }
+
+    return response;
+  }
 }

Reply via email to