This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new cf6f83550a API, Core: Add FileIO to Scan API (#15561)
cf6f83550a is described below
commit cf6f83550afc9081232b437c58730048aad51425
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 13 06:42:50 2026 +0100
API, Core: Add FileIO to Scan API (#15561)
---
.../main/java/org/apache/iceberg/BatchScan.java | 8 ++
.../java/org/apache/iceberg/BatchScanAdapter.java | 6 ++
api/src/main/java/org/apache/iceberg/Scan.java | 6 ++
.../src/main/java/org/apache/iceberg/BaseScan.java | 3 +-
.../java/org/apache/iceberg/rest/RESTTable.java | 1 -
.../org/apache/iceberg/rest/RESTTableScan.java | 46 ++++-----
.../apache/iceberg/rest/TestRESTScanPlanning.java | 111 +++++++++++++++++++++
7 files changed, 154 insertions(+), 27 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/BatchScan.java
b/api/src/main/java/org/apache/iceberg/BatchScan.java
index 4823d7f180..bd53fe97a3 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScan.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScan.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg;
+import org.apache.iceberg.io.FileIO;
+
/** API for configuring a batch scan. */
public interface BatchScan extends Scan<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
/**
@@ -68,4 +70,10 @@ public interface BatchScan extends Scan<BatchScan, ScanTask,
ScanTaskGroup<ScanT
* @return the Snapshot this scan will use
*/
Snapshot snapshot();
+
+ /** Returns the {@link FileIO} instance to use when reading data files for
this scan. */
+ @Override
+ default FileIO io() {
+ return table().io();
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index f723e04d0c..cccd1cd162 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
@@ -38,6 +39,11 @@ public class BatchScanAdapter implements BatchScan {
return scan.table();
}
+ @Override
+ public FileIO io() {
+ return scan.io();
+ }
+
@Override
public BatchScan useSnapshot(long snapshotId) {
return new BatchScanAdapter(scan.useSnapshot(snapshotId));
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java
b/api/src/main/java/org/apache/iceberg/Scan.java
index 9785ce6603..4bd7717fd1 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -208,4 +209,9 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement minRowsRequested");
}
+
+ /** Returns the {@link FileIO} instance to use when reading data files for
this scan. */
+ default FileIO io() {
+ throw new UnsupportedOperationException("io() is not implemented: added in
1.11.0");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 809eafbd6b..53f3782b38 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -102,7 +102,8 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
return table;
}
- protected FileIO io() {
+ @Override
+ public FileIO io() {
return table.io();
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
index 26b694b50c..21d84d8477 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java
@@ -75,7 +75,6 @@ class RESTTable extends BaseTable implements
SupportsDistributedScanPlanning {
tableIdentifier,
resourcePaths,
supportedEndpoints,
- io(),
catalogProperties,
hadoopConf);
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index 141efb3ac3..f8860c42d7 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -76,16 +76,14 @@ class RESTTableScan extends DataTableScan {
private final RESTClient client;
private final Map<String, String> headers;
private final TableOperations operations;
- private final Table table;
private final ResourcePaths resourcePaths;
private final TableIdentifier tableIdentifier;
private final Set<Endpoint> supportedEndpoints;
private final ParserContext parserContext;
private final Map<String, String> catalogProperties;
private final Object hadoopConf;
- private final FileIO tableIO;
private String planId = null;
- private FileIO fileIOForPlanId = null;
+ private FileIO scanFileIO = null;
RESTTableScan(
Table table,
@@ -97,11 +95,9 @@ class RESTTableScan extends DataTableScan {
TableIdentifier tableIdentifier,
ResourcePaths resourcePaths,
Set<Endpoint> supportedEndpoints,
- FileIO tableIO,
Map<String, String> catalogProperties,
Object hadoopConf) {
super(table, schema, context);
- this.table = table;
this.client = client;
this.headers = headers;
this.operations = operations;
@@ -113,7 +109,6 @@ class RESTTableScan extends DataTableScan {
.add("specsById", table.specs())
.add("caseSensitive", context().caseSensitive())
.build();
- this.tableIO = tableIO;
this.catalogProperties = catalogProperties;
this.hadoopConf = hadoopConf;
}
@@ -131,14 +126,15 @@ class RESTTableScan extends DataTableScan {
tableIdentifier,
resourcePaths,
supportedEndpoints,
- io(),
catalogProperties,
hadoopConf);
}
@Override
- protected FileIO io() {
- return null != fileIOForPlanId ? fileIOForPlanId : tableIO;
+ public FileIO io() {
+ Preconditions.checkState(
+ null != scanFileIO, "FileIO is not available: planFiles() must be
called first");
+ return scanFileIO;
}
@Override
@@ -170,7 +166,7 @@ class RESTTableScan extends DataTableScan {
.withEndSnapshotId(endSnapshotId)
.withUseSnapshotSchema(true);
} else if (snapshotId != null) {
- boolean useSnapShotSchema = snapshotId !=
table.currentSnapshot().snapshotId();
+ boolean useSnapShotSchema = snapshotId !=
table().currentSnapshot().snapshotId();
builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema);
}
@@ -190,9 +186,8 @@ class RESTTableScan extends DataTableScan {
this.planId = response.planId();
PlanStatus planStatus = response.planStatus();
- if (null != planId && !response.credentials().isEmpty()) {
- this.fileIOForPlanId = fileIOForPlanId(response.credentials());
- }
+ this.scanFileIO =
+ !response.credentials().isEmpty() ? scanFileIO(response.credentials())
: table().io();
switch (planStatus) {
case COMPLETED:
@@ -212,14 +207,18 @@ class RESTTableScan extends DataTableScan {
}
}
- private FileIO fileIOForPlanId(List<Credential> storageCredentials) {
+ private FileIO scanFileIO(List<Credential> storageCredentials) {
+ ImmutableMap.Builder<String, String> builder =
+ ImmutableMap.<String, String>builder().putAll(catalogProperties);
+ if (null != planId) {
+ builder.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId);
+ }
+
+ Map<String, String> properties = builder.buildKeepingLast();
FileIO ioForScan =
CatalogUtil.loadFileIO(
catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL,
DEFAULT_FILE_IO_IMPL),
- ImmutableMap.<String, String>builder()
- .putAll(catalogProperties)
- .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
- .buildKeepingLast(),
+ properties,
hadoopConf,
storageCredentials.stream()
.map(c -> StorageCredential.create(c.prefix(), c.config()))
@@ -275,9 +274,8 @@ class RESTTableScan extends DataTableScan {
response.planStatus(),
planId);
- if (!response.credentials().isEmpty()) {
- this.fileIOForPlanId = fileIOForPlanId(response.credentials());
- }
+ this.scanFileIO =
+ !response.credentials().isEmpty() ?
scanFileIO(response.credentials()) : table().io();
return scanTasksIterable(response.planTasks(), response.fileScanTasks());
} catch (FailsafeException e) {
@@ -319,10 +317,8 @@ class RESTTableScan extends DataTableScan {
/** Cancels the plan on the server (if supported) and closes the plan-scoped
FileIO */
private void cleanupPlanResources() {
cancelPlan();
- if (null != fileIOForPlanId) {
- FILEIO_TRACKER.invalidate(this);
- this.fileIOForPlanId = null;
- }
+ FILEIO_TRACKER.invalidate(this);
+ this.scanFileIO = null;
}
@VisibleForTesting
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index ab0e1d9c56..734eaf485c 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -62,8 +62,12 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -953,4 +957,111 @@ public class TestRESTScanPlanning extends
TestBaseWithRESTServer {
// Verify no exception was thrown - cancelPlan returns false when endpoint
not supported
assertThat(cancelled).isFalse();
}
+
+ @ParameterizedTest
+ @EnumSource(PlanningMode.class)
+ void fileIOForRemotePlanningIsPropagated(
+ Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder>
planMode) {
+ RESTCatalogAdapter adapter =
+ Mockito.spy(
+ new RESTCatalogAdapter(backendCatalog) {
+ @Override
+ public <T extends RESTResponse> T execute(
+ HTTPRequest request,
+ Class<T> responseType,
+ Consumer<ErrorResponse> errorHandler,
+ Consumer<Map<String, String>> responseHeaders,
+ ParserContext parserContext) {
+ T response =
+ super.execute(
+ request, responseType, errorHandler, responseHeaders,
parserContext);
+ return maybeAddStorageCredential(response);
+ }
+ });
+
+
adapter.setPlanningBehavior(planMode.apply(TestPlanningBehavior.builder()).build());
+
+ RESTCatalog catalog =
+ new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config)
-> adapter);
+ catalog.initialize(
+ "test",
+ ImmutableMap.of(
+ CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.inmemory.InMemoryFileIO",
+ RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
+ "true"));
+
+ Table table = restTableFor(catalog, "file_io_propagation");
+
+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+ TableScan tableScan = table.newScan();
+ assertThatThrownBy(tableScan::io)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("FileIO is not available: planFiles() must be called
first");
+
+ // make sure remote scan planning is called and FileIO gets the planId
+ assertThat(tableScan.planFiles()).hasSize(1);
+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
assertThat(tableScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+ String planId =
tableScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+ TableScan newScan = table.newScan();
+ assertThatThrownBy(newScan::io)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("FileIO is not available: planFiles() must be called
first");
+
+ // make sure remote scan planning is called and FileIO gets the planId
+ assertThat(newScan.planFiles()).hasSize(1);
+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
+ // make sure planIds are different for each scan
+
assertThat(newScan.io().properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
assertThat(newScan.io().properties().get(RESTCatalogProperties.REST_SCAN_PLAN_ID))
+ .isNotEqualTo(planId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends RESTResponse> T maybeAddStorageCredential(T response) {
+ if (response instanceof PlanTableScanResponse resp
+ && PlanStatus.COMPLETED == resp.planStatus()) {
+ return (T)
+ PlanTableScanResponse.builder()
+ .withPlanStatus(resp.planStatus())
+ .withPlanId(resp.planId())
+ .withPlanTasks(resp.planTasks())
+ .withFileScanTasks(resp.fileScanTasks())
+ .withCredentials(
+ ImmutableList.<Credential>builder()
+ .addAll(resp.credentials())
+ .add(
+ ImmutableCredential.builder()
+ .prefix("dummy")
+ .putConfig("dummyKey", "dummyVal")
+ .build())
+ .build())
+ .withSpecsById(resp.specsById())
+ .build();
+ } else if (response instanceof FetchPlanningResultResponse resp
+ && PlanStatus.COMPLETED == resp.planStatus()) {
+ return (T)
+ FetchPlanningResultResponse.builder()
+ .withPlanStatus(resp.planStatus())
+ .withFileScanTasks(resp.fileScanTasks())
+ .withPlanTasks(resp.planTasks())
+ .withSpecsById(resp.specsById())
+ .withCredentials(
+ ImmutableList.<Credential>builder()
+ .addAll(resp.credentials())
+ .add(
+ ImmutableCredential.builder()
+ .prefix("dummy")
+ .putConfig("dummyKey", "dummyVal")
+ .build())
+ .build())
+ .build();
+ }
+
+ return response;
+ }
}