This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0a73da119f Core: Fix useSnapshotSchema logic and projection in
RESTTableScan (#15609)
0a73da119f is described below
commit 0a73da119ff38ee3a98f248b42180caa51001cec
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Mar 19 14:56:21 2026 -0700
Core: Fix useSnapshotSchema logic and projection in RESTTableScan (#15609)
* Core: Fix useSnapshotSchema logic and projection in RESTTableScan
The useSnapshotSchema determination previously compared snapshot IDs,
which was incorrect. The real distinction is branch (false) vs
tag/direct snapshot ID (true). This is now tracked via a field that
is set in overridden useRef() and useSnapshot() methods.
The projection code previously only selected top-level field names,
missing nested fields. Now uses TypeUtil.getProjectedIds() matching
the pattern in SnapshotScan.
* Core: Add tests for useSnapshotSchema and nested projection in
RESTTableScan
* Core: Avoid unnecessary casts in useRef and useSnapshot
Set useSnapshotSchema on this before calling super, letting
newRefinedScan propagate the value to the new instance.
* Remove unnecessary Mockito.clearInvocations calls
---------
Co-authored-by: Prashant Singh <[email protected]>
---
.../org/apache/iceberg/rest/RESTTableScan.java | 51 +++++++----
.../apache/iceberg/rest/TestRESTScanPlanning.java | 98 ++++++++++++++++++++++
2 files changed, 133 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index 1adfb17c9f..74fe9ebd7d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
@@ -43,11 +44,12 @@ import org.apache.iceberg.io.StorageCredential;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +85,7 @@ class RESTTableScan extends DataTableScan {
private final Object hadoopConf;
private String planId = null;
private FileIO scanFileIO = null;
+ private boolean useSnapshotSchema = false;
RESTTableScan(
Table table,
@@ -115,18 +118,34 @@ class RESTTableScan extends DataTableScan {
@Override
protected TableScan newRefinedScan(
Table refinedTable, Schema refinedSchema, TableScanContext
refinedContext) {
- return new RESTTableScan(
- refinedTable,
- refinedSchema,
- refinedContext,
- client,
- headers,
- operations,
- tableIdentifier,
- resourcePaths,
- supportedEndpoints,
- catalogProperties,
- hadoopConf);
+ RESTTableScan scan =
+ new RESTTableScan(
+ refinedTable,
+ refinedSchema,
+ refinedContext,
+ client,
+ headers,
+ operations,
+ tableIdentifier,
+ resourcePaths,
+ supportedEndpoints,
+ catalogProperties,
+ hadoopConf);
+ scan.useSnapshotSchema = useSnapshotSchema;
+ return scan;
+ }
+
+ @Override
+ public TableScan useRef(String name) {
+ SnapshotRef ref = table().refs().get(name);
+ this.useSnapshotSchema = ref != null && ref.isTag();
+ return super.useRef(name);
+ }
+
+ @Override
+ public TableScan useSnapshot(long snapshotId) {
+ this.useSnapshotSchema = true;
+ return super.useSnapshot(snapshotId);
}
@Override
@@ -143,8 +162,9 @@ class RESTTableScan extends DataTableScan {
Long startSnapshotId = context().fromSnapshotId();
Long endSnapshotId = context().toSnapshotId();
Long snapshotId = snapshotId();
+ List<Integer> projectedFieldIds =
Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
List<String> selectedColumns =
-
schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
+
projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());
List<String> statsFields = null;
if (columnsToKeepStats() != null) {
@@ -168,8 +188,7 @@ class RESTTableScan extends DataTableScan {
.withEndSnapshotId(endSnapshotId)
.withUseSnapshotSchema(true);
} else if (snapshotId != null) {
- boolean useSnapShotSchema = snapshotId !=
table().currentSnapshot().snapshotId();
-
builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema);
+
builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapshotSchema);
}
return planTableScan(builder.build());
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index 206ff61945..a7fbe43463 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -31,6 +31,7 @@ import static org.apache.iceberg.TestBase.SPEC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
@@ -50,6 +51,7 @@ import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Scan;
import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.SessionCatalog;
@@ -68,6 +70,7 @@ import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -849,6 +852,101 @@ public class TestRESTScanPlanning extends
TestBaseWithRESTServer {
restTableScanFor(table);
}
+ // ==================== useSnapshotSchema and Projection Tests
====================
+
+ private PlanTableScanRequest captureLastPlanRequest() {
+ ArgumentCaptor<HTTPRequest> captor =
ArgumentCaptor.forClass(HTTPRequest.class);
+ Mockito.verify(adapterForRESTServer, atLeastOnce())
+ .execute(captor.capture(), any(), any(), any());
+ return captor.getAllValues().stream()
+ .filter(req -> req.body() instanceof PlanTableScanRequest)
+ .map(req -> (PlanTableScanRequest) req.body())
+ .reduce((first, second) -> second)
+ .orElseThrow(() -> new AssertionError("No PlanTableScanRequest
captured"));
+ }
+
+ @Test
+ void useSnapshotSchemaSetCorrectlyForSnapshotAndBranchAndTag() throws
IOException {
+ configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous);
+ Table table = restTableFor(restCatalog, "use_snapshot_schema_test");
+ setParserContext(table);
+
+ table.newAppend().appendFile(FILE_B).commit();
+ table.refresh();
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ // Create a tag and a branch pointing to the current snapshot
+ table.manageSnapshots().createTag("test-tag", snapshotId).commit();
+ table.manageSnapshots().createBranch("test-branch", snapshotId).commit();
+
+ // Test 1: Scanning current snapshot without time travel should NOT set
useSnapshotSchema
+ try (CloseableIterable<FileScanTask> ignored =
table.newScan().planFiles()) {
+ PlanTableScanRequest request = captureLastPlanRequest();
+ assertThat(request.useSnapshotSchema())
+ .as("Default scan should not use snapshot schema")
+ .isFalse();
+ }
+
+ // Test 2: useSnapshot() should set useSnapshotSchema=true
+ try (CloseableIterable<FileScanTask> ignored =
+ table.newScan().useSnapshot(snapshotId).planFiles()) {
+ PlanTableScanRequest request = captureLastPlanRequest();
+ assertThat(request.useSnapshotSchema())
+ .as("useSnapshot() should set useSnapshotSchema=true")
+ .isTrue();
+ }
+
+ // Test 3: useRef() with a tag should set useSnapshotSchema=true
+ try (CloseableIterable<FileScanTask> ignored =
table.newScan().useRef("test-tag").planFiles()) {
+ PlanTableScanRequest request = captureLastPlanRequest();
+ assertThat(request.useSnapshotSchema())
+ .as("useRef() with a tag should set useSnapshotSchema=true")
+ .isTrue();
+ }
+
+ // Test 4: useRef() with a branch should NOT set useSnapshotSchema
+ try (CloseableIterable<FileScanTask> ignored =
+ table.newScan().useRef("test-branch").planFiles()) {
+ PlanTableScanRequest request = captureLastPlanRequest();
+ assertThat(request.useSnapshotSchema())
+ .as("useRef() with a branch should not use snapshot schema")
+ .isFalse();
+ }
+ }
+
+ @Test
+ void selectWithNestedFieldsSendsFullyQualifiedNames() throws IOException {
+ configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous);
+
+ Schema nestedSchema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(
+ 2,
+ "address",
+ Types.StructType.of(
+ Types.NestedField.required(3, "city",
Types.StringType.get()),
+ Types.NestedField.required(4, "zip",
Types.StringType.get()))));
+
+ restCatalog.createNamespace(NS);
+ Table table =
+ restCatalog
+ .buildTable(TableIdentifier.of(NS, "nested_projection_test"),
nestedSchema)
+ .create();
+
+ setParserContext(table);
+
+ // Select a nested field — the server needs the fully qualified name
"address.city"
+ try (CloseableIterable<FileScanTask> ignored =
+ table.newScan().select("address.city").planFiles()) {
+ PlanTableScanRequest request = captureLastPlanRequest();
+ assertThat(request.select())
+ .as("Nested field projection should send fully qualified column
names")
+ .contains("address.city")
+ .doesNotContain("address.zip");
+ }
+ }
+
// ==================== Endpoint Support Tests ====================
/** Helper class to hold catalog and adapter for endpoint support tests. */