This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 867835e282 Core: Use time-travel schema when resolving partition spec
in scan (#13301)
867835e282 is described below
commit 867835e2829f5ee5ecbd84ae8098d484cf9eed0e
Author: chenjian2664 <[email protected]>
AuthorDate: Mon Oct 27 21:39:14 2025 +0800
Core: Use time-travel schema when resolving partition spec in scan (#13301)
---
.../apache/iceberg/BaseDistributedDataScan.java | 2 +-
.../src/main/java/org/apache/iceberg/DataScan.java | 2 +-
.../java/org/apache/iceberg/DataTableScan.java | 2 +-
.../main/java/org/apache/iceberg/SnapshotScan.java | 22 ++++
.../iceberg/TestScansAndSchemaEvolution.java | 142 ++++++++++++++++++++-
5 files changed, 163 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
index 89c7f0b606..c69f71f2fd 100644
--- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
@@ -300,7 +300,7 @@ abstract class BaseDistributedDataScan
}
return builder
- .specsById(table().specs())
+ .specsById(specs())
.filterData(filter())
.caseSensitive(isCaseSensitive())
.scanMetrics(scanMetrics())
diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java
b/core/src/main/java/org/apache/iceberg/DataScan.java
index 1c48042f52..1acbbbf682 100644
--- a/core/src/main/java/org/apache/iceberg/DataScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataScan.java
@@ -53,7 +53,7 @@ abstract class DataScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
.caseSensitive(isCaseSensitive())
.select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
- .specsById(table().specs())
+ .specsById(specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 8463112b7a..4d23dd525e 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -74,7 +74,7 @@ public class DataTableScan extends BaseTableScan {
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
- .specsById(table().specs())
+ .specsById(specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index a98a8c9f13..8a836b634e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.Timer;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
@@ -79,6 +80,27 @@ public abstract class SnapshotScan<ThisT, T extends
ScanTask, G extends ScanTask
return scanMetrics;
}
+ protected Map<Integer, PartitionSpec> specs() {
+ Map<Integer, PartitionSpec> specs = table().specs();
+ // requires latest schema
+ if (!useSnapshotSchema()
+ || snapshotId() == null
+ || table().currentSnapshot() == null
+ || snapshotId().equals(table().currentSnapshot().snapshotId())) {
+ return specs;
+ }
+
+ // this is a time travel request
+ Schema snapshotSchema = tableSchema();
+ ImmutableMap.Builder<Integer, PartitionSpec> newSpecs =
+ ImmutableMap.builderWithExpectedSize(specs.size());
+ for (Map.Entry<Integer, PartitionSpec> entry : specs.entrySet()) {
+ newSpecs.put(entry.getKey(),
entry.getValue().toUnbound().bind(snapshotSchema));
+ }
+
+ return newSpecs.build();
+ }
+
public ThisT useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set snapshot
id=%s", snapshotId());
diff --git
a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
index 790f7c7152..3df370fe6f 100644
--- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.RandomAvroData;
@@ -65,21 +66,26 @@ public class TestScansAndSchemaEvolution {
@TempDir private File temp;
private DataFile createDataFile(String partValue) throws IOException {
- List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100,
0L);
+ return createDataFile(partValue, SCHEMA, SPEC);
+ }
+
+ private DataFile createDataFile(String partValue, Schema schema,
PartitionSpec spec)
+ throws IOException {
+ List<GenericData.Record> expected = RandomAvroData.generate(schema, 100,
0L);
OutputFile dataFile =
new
InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
try (FileAppender<GenericData.Record> writer =
- Avro.write(dataFile).schema(SCHEMA).named("test").build()) {
+ Avro.write(dataFile).schema(schema).named("test").build()) {
for (GenericData.Record rec : expected) {
rec.put("part", partValue); // create just one partition
writer.add(rec);
}
}
- PartitionData partition = new PartitionData(SPEC.partitionType());
+ PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, partValue);
- return DataFiles.builder(SPEC)
+ return DataFiles.builder(spec)
.withInputFile(dataFile.toInputFile())
.withPartition(partition)
.withRecordCount(100)
@@ -99,6 +105,7 @@ public class TestScansAndSchemaEvolution {
DataFile fileTwo = createDataFile("two");
table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().filter(Expressions.equal("part",
"one")).planFiles());
@@ -111,6 +118,133 @@ public class TestScansAndSchemaEvolution {
tasks = Lists.newArrayList(table.newScan().filter(Expressions.equal("p",
"one")).planFiles());
assertThat(tasks).hasSize(1);
+
+ // create a new commit
+ table.newAppend().appendFile(createDataFile("three")).commit();
+
+ // use fiter with previous partition name
+ tasks =
+ Lists.newArrayList(
+ table
+ .newScan()
+ .useSnapshot(firstSnapshotId)
+ .filter(Expressions.equal("part", "one"))
+ .planFiles());
+
+ assertThat(tasks).hasSize(1);
+ }
+
+ @TestTemplate
+ public void testPartitionSourceDrop() throws IOException {
+ Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+ DataFile fileOne = createDataFile("one");
+ DataFile fileTwo = createDataFile("two");
+
+ table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+ table.updateSpec().addField("id").commit();
+
+ List<FileScanTask> tasks =
+ Lists.newArrayList(
+
table.newScan().filter(Expressions.not(Expressions.isNull("id"))).planFiles());
+
+ assertThat(tasks).hasSize(2);
+
+ DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+ table.newAppend().appendFile(fileThree).commit();
+
+ // remove one field from spec and drop the column
+ table.updateSpec().removeField("id").commit();
+ table.updateSchema().deleteColumn("id").commit();
+
+ List<FileScanTask> tasksAtFirstSnapshotId =
+ Lists.newArrayList(
+ table
+ .newScan()
+ .useSnapshot(firstSnapshotId)
+ .filter(Expressions.not(Expressions.isNull("id")))
+ .planFiles());
+
+ assertThat(
+ tasksAtFirstSnapshotId.stream()
+ .map(ContentScanTask::file)
+ .map(ContentFile::location)
+ .collect(Collectors.toList()))
+ .isEqualTo(
+ tasks.stream()
+ .map(ContentScanTask::file)
+ .map(ContentFile::location)
+ .collect(Collectors.toList()));
+ }
+
+ @TestTemplate
+ public void testColumnRename() throws IOException {
+ Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+ DataFile fileOne = createDataFile("one");
+ DataFile fileTwo = createDataFile("two");
+
+ table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+ table.updateSchema().renameColumn("data", "renamed_data").commit();
+
+ DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+ table.newAppend().appendFile(fileThree).commit();
+ long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+ // generate a new commit
+ DataFile fileFour = createDataFile("four", table.schema(), table.spec());
+ table.newAppend().appendFile(fileFour).commit();
+
+ // running successfully with the new filter on previous column name
+ List<FileScanTask> tasks =
+ Lists.newArrayList(
+ table
+ .newScan()
+ .useSnapshot(firstSnapshotId)
+ .filter(Expressions.equal("data", "xyz"))
+ .planFiles());
+ assertThat(tasks).hasSize(2);
+
+ // running successfully with the new filter on renamed column name
+ tasks =
+ Lists.newArrayList(
+ table
+ .newScan()
+ .useSnapshot(secondSnapshotId)
+ .filter(Expressions.equal("renamed_data", "xyz"))
+ .planFiles());
+ assertThat(tasks).hasSize(3);
+ }
+
+ @TestTemplate
+ public void testColumnDrop() throws IOException {
+ Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);
+
+ DataFile fileOne = createDataFile("one");
+ DataFile fileTwo = createDataFile("two");
+
+ table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
+ long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+ table.updateSchema().deleteColumn("data").commit();
+
+ // make sure generating a new commit after dropping a column
+ DataFile fileThree = createDataFile("three", table.schema(), table.spec());
+ table.newAppend().appendFile(fileThree).commit();
+
+ // running successfully with the new filter on previous column name
+ List<FileScanTask> tasks =
+ Lists.newArrayList(
+ table
+ .newScan()
+ .useSnapshot(firstSnapshotId)
+ .filter(Expressions.equal("data", "xyz"))
+ .planFiles());
+ assertThat(tasks).hasSize(2);
}
@TestTemplate