This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 02f7c4c1b [AMORO-4044] Fix partition mapping bug in TableEntriesScan
for tables with evolved PartitionSpec (#4085)
02f7c4c1b is described below
commit 02f7c4c1b4db597b1503487c946e278410e3152a
Author: Jiwon Park <[email protected]>
AuthorDate: Fri Mar 6 21:58:15 2026 +0900
[AMORO-4044] Fix partition mapping bug in TableEntriesScan for tables with
evolved PartitionSpec (#4085)
The entries metadata table returns a unified super-struct partition
containing fields from all PartitionSpecs. buildDataFile() and
buildDeleteFile() passed this directly to withPartition(), causing
partition mismatch after spec evolution. Fix by projecting the unified
partition to the spec-specific type using StructProjection.
Signed-off-by: Jiwon Park <[email protected]>
---
.../org/apache/amoro/scan/TableEntriesScan.java | 21 ++++++++-
.../apache/amoro/scan/TestTableEntriesScan.java | 50 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 2 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java
index 4649a1736..0030bfea3 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/scan/TableEntriesScan.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +79,7 @@ public class TableEntriesScan {
private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
private Map<String, Integer> lazyIndexOfDataFileType;
private Map<String, Integer> lazyIndexOfEntryType;
+ private Types.StructType lazyUnifiedPartitionType;
public static Builder builder(Table table) {
return new Builder(table);
@@ -340,7 +343,7 @@ public class TableEntriesScan {
if (spec.isPartitioned()) {
StructLike partition =
fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME),
StructLike.class);
- builder.withPartition(partition);
+ builder.withPartition(projectPartition(spec, partition));
}
return builder.build();
}
@@ -372,7 +375,7 @@ public class TableEntriesScan {
if (spec.isPartitioned()) {
StructLike partition =
fileRecord.get(dataFileFieldIndex(DataFile.PARTITION_NAME),
StructLike.class);
- builder.withPartition(partition);
+ builder.withPartition(projectPartition(spec, partition));
}
if (fileContent == FileContent.EQUALITY_DELETES) {
builder.ofEqualityDeletes();
@@ -429,6 +432,20 @@ public class TableEntriesScan {
return lazyIndexOfDataFileType.get(fieldName);
}
+ private StructLike projectPartition(PartitionSpec spec, StructLike
partition) {
+ StructProjection projected =
+ StructProjection.createAllowMissing(unifiedPartitionType(),
spec.partitionType());
+ projected.wrap(partition);
+ return projected;
+ }
+
+ private Types.StructType unifiedPartitionType() {
+ if (lazyUnifiedPartitionType == null) {
+ lazyUnifiedPartitionType = Partitioning.partitionType(table);
+ }
+ return lazyUnifiedPartitionType;
+ }
+
private InclusiveMetricsEvaluator metricsEvaluator() {
if (lazyMetricsEvaluator == null) {
if (dataFilter != null) {
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java
index 3896c2499..f261baa88 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/scan/TestTableEntriesScan.java
@@ -25,10 +25,13 @@ import org.apache.amoro.io.writer.GenericTaskWriters;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.utils.ManifestEntryFields;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
@@ -186,6 +189,53 @@ public class TestTableEntriesScan extends
TableDataTestBase {
Assert.assertEquals(1, cnt);
}
+ @Test
+ public void testScanEntriesWithPartitionSpecEvolution() throws IOException {
+ // base table has data files under spec 0: day(op_time)
+ Table baseTable = getMixedTable().asKeyedTable().baseTable();
+ PartitionSpec originalSpec = baseTable.spec();
+
+ // evolve partition spec: add identity(id)
+ baseTable.updateSpec().addField("id").commit();
+ PartitionSpec newSpec = baseTable.spec();
+ Assert.assertNotEquals(originalSpec.specId(), newSpec.specId());
+
+ // append a data file under the new spec
+ DataFile newSpecFile =
+ DataFiles.builder(newSpec)
+ .withPath("/tmp/test-partition-evolution/data-new-spec.parquet")
+ .withFileSizeInBytes(100)
+ .withRecordCount(1)
+ .withPartitionPath("op_time_day=2022-01-01/id=1")
+ .build();
+ baseTable.newAppend().appendFile(newSpecFile).commit();
+
+ // scan all entries - without the fix this would fail due to partition
struct mismatch
+ TableEntriesScan entriesScan =
+ TableEntriesScan.builder(baseTable)
+ .includeFileContent(
+ FileContent.DATA, FileContent.POSITION_DELETES,
FileContent.EQUALITY_DELETES)
+ .build();
+
+ int count = 0;
+ try (CloseableIterable<IcebergFileEntry> entries = entriesScan.entries()) {
+ for (IcebergFileEntry entry : entries) {
+ count++;
+ ContentFile<?> file = entry.getFile();
+ Assert.assertNotNull(file.partition());
+
+ // verify partition values are not null for partitioned files
+ PartitionSpec fileSpec = baseTable.specs().get(file.specId());
+ if (fileSpec.isPartitioned()) {
+ // op_time_day should be present for all specs
+ Assert.assertNotNull(file.partition().get(0, Object.class));
+ }
+ }
+ }
+ // original: 4 data + 1 pos-delete + 1 new data = 6
+ Assert.assertEquals(6, count);
+ }
+
private List<DataFile> writeIntoBase() throws IOException {
long transactionId = getMixedTable().asKeyedTable().beginTransaction("");
GenericBaseTaskWriter writer =