This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new fa162de015 Core: PartitionsTable#partitions returns incomplete list in case of partition evolution and NULL partition values (#12528) fa162de015 is described below commit fa162de0151121adbec33cbbdd029e6f61da762b Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Wed Jun 18 20:02:11 2025 +0200 Core: PartitionsTable#partitions returns incomplete list in case of partition evolution and NULL partition values (#12528) --- .../org/apache/iceberg/util/StructProjection.java | 5 ++ .../java/org/apache/iceberg/PartitionsTable.java | 63 ++++++++-------- .../org/apache/iceberg/util/StructLikeMap.java | 31 +++++++- .../org/apache/iceberg/util/StructLikeWrapper.java | 13 ++-- .../apache/iceberg/MetadataTableScanTestBase.java | 2 +- .../src/test/java/org/apache/iceberg/TestBase.java | 12 ++- ...stMetadataTableScansWithPartitionEvolution.java | 86 ++++++++++++++++++++++ .../test/java/org/apache/iceberg/TestTables.java | 41 +++++++++-- 8 files changed, 201 insertions(+), 52 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index 8e2fc7a141..08dedf0fe1 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.ListType; @@ -173,6 +174,10 @@ public class StructProjection implements StructLike { } } + public int projectedFields() { + return (int) Ints.asList(positionMap).stream().filter(val -> val != -1).count(); + } + public StructProjection wrap(StructLike newStruct) { this.struct = newStruct; return this; diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 6d0fc8c235..09c6e7893b 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -22,14 +22,17 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Comparator; import java.util.List; import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; /** A {@link Table} implementation that exposes a table's partitions as rows. */ public class PartitionsTable extends BaseMetadataTable { @@ -165,21 +168,26 @@ public class PartitionsTable extends BaseMetadataTable { private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { Types.StructType partitionType = Partitioning.partitionType(table); - PartitionMap partitions = new PartitionMap(partitionType); + + StructLikeMap<Partition> partitions = + StructLikeMap.create(partitionType, new PartitionComparator(partitionType)); + try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = planEntries(scan)) { for (ManifestEntry<? extends ContentFile<?>> entry : entries) { Snapshot snapshot = table.snapshot(entry.snapshotId()); ContentFile<?> file = entry.file(); - StructLike partition = + StructLike key = PartitionUtil.coercePartition( partitionType, table.specs().get(file.specId()), file.partition()); - partitions.get(partition).update(file, snapshot); + partitions + .computeIfAbsent(key, () -> new Partition(key, partitionType)) + .update(file, snapshot); } } catch (IOException e) { throw new UncheckedIOException(e); } - return partitions.all(); + return partitions.values(); } @VisibleForTesting @@ -238,26 +246,26 @@ public class PartitionsTable extends BaseMetadataTable { } } - static class PartitionMap { - private final StructLikeMap<Partition> partitions; - private final Types.StructType keyType; + private static class PartitionComparator implements Comparator<StructLike> { + private Comparator<StructLike> comparator; - PartitionMap(Types.StructType type) { - this.partitions = StructLikeMap.create(type); - this.keyType = type; + private PartitionComparator(Types.StructType struct) { + this.comparator = Comparators.forType(struct); } - Partition get(StructLike key) { - Partition partition = partitions.get(key); - if (partition == null) { - partition = new Partition(key, keyType); - partitions.put(key, partition); + @Override + public int compare(StructLike o1, StructLike o2) { + if (o1 instanceof StructProjection && o2 instanceof StructProjection) { + int cmp = + Integer.compare( + ((StructProjection) o1).projectedFields(), + ((StructProjection) o2).projectedFields()); + if (cmp != 0) { + return cmp; + } } - return partition; - } - Iterable<Partition> all() { - return partitions.values(); + return comparator.compare(o1, o2); } } @@ -290,6 +298,8 @@ public class PartitionsTable extends BaseMetadataTable { if (snapshot != null) { long snapshotCommitTime = snapshot.timestampMillis() * 1000; if (this.lastUpdatedAt == null || snapshotCommitTime > this.lastUpdatedAt) { + this.specId = file.specId(); + this.lastUpdatedAt = snapshotCommitTime; this.lastUpdatedSnapshotId = snapshot.snapshotId(); } @@ -299,18 +309,15 @@ public class PartitionsTable extends BaseMetadataTable { case DATA: this.dataRecordCount += file.recordCount(); this.dataFileCount += 1; - this.specId = file.specId(); this.dataFileSizeInBytes += file.fileSizeInBytes(); break; case POSITION_DELETES: this.posDeleteRecordCount += file.recordCount(); this.posDeleteFileCount += 1; - this.specId = file.specId(); break; case EQUALITY_DELETES: this.eqDeleteRecordCount += file.recordCount(); this.eqDeleteFileCount += 1; - this.specId = file.specId(); break; default: throw new UnsupportedOperationException( @@ -319,15 +326,9 @@ public class PartitionsTable extends BaseMetadataTable { } /** Needed because StructProjection is not serializable */ - private PartitionData toPartitionData(StructLike key, Types.StructType keyType) { - PartitionData data = new PartitionData(keyType); - for (int i = 0; i < keyType.fields().size(); i++) { - Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass()); - if (val != null) { - data.set(i, val); - } - } - return data; + private static PartitionData toPartitionData(StructLike key, Types.StructType keyType) { + PartitionData keyTemplate = new PartitionData(keyType); + return keyTemplate.copyFor(key); } } } diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java index 2bb5fa1c9d..e0d5c0c6f5 100644 --- a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java @@ -20,28 +20,49 @@ package org.apache.iceberg.util; import java.util.AbstractMap; import java.util.Collection; +import java.util.Comparator; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements Map<StructLike, T> { + /** + * Creates a new StructLikeMap with the specified type and comparator. + * + * @param type the struct type for the keys + * @param comparator the comparator for comparing struct keys + * @return a new StructLikeMap instance + */ + public static <T> StructLikeMap<T> create( + Types.StructType type, Comparator<StructLike> comparator) { + return new StructLikeMap<>(type, comparator); + } + + /** + * Creates a new StructLikeMap with the specified type using the default comparator for the type. + * + * @param type the struct type for the keys + * @return a new StructLikeMap instance + */ public static <T> StructLikeMap<T> create(Types.StructType type) { - return new StructLikeMap<>(type); + return create(type, Comparators.forType(type)); } private final Types.StructType type; private final Map<StructLikeWrapper, T> wrapperMap; private final ThreadLocal<StructLikeWrapper> wrappers; - private StructLikeMap(Types.StructType type) { + private StructLikeMap(Types.StructType type, Comparator<StructLike> comparator) { this.type = type; this.wrapperMap = Maps.newHashMap(); - this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type)); + this.wrappers = ThreadLocal.withInitial(() -> StructLikeWrapper.forType(type, comparator)); } @Override @@ -125,6 +146,10 @@ public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements Map< return entrySet; } + public T computeIfAbsent(StructLike struct, Supplier<T> valueSupplier) { + return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key -> valueSupplier.get()); + } + private static class StructLikeEntry<R> implements Entry<StructLike, R> { private final Entry<StructLikeWrapper, R> inner; diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java index e8cf0a8db7..28629706bf 100644 --- a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java +++ b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java @@ -27,8 +27,13 @@ import org.apache.iceberg.types.Types; /** Wrapper to adapt StructLike for use in maps and sets by implementing equals and hashCode. */ public class StructLikeWrapper { - public static StructLikeWrapper forType(Types.StructType struct) { - return new StructLikeWrapper(struct); + public static StructLikeWrapper forType( + Types.StructType type, Comparator<StructLike> comparator) { + return new StructLikeWrapper(comparator, JavaHash.forType(type)); + } + + public static StructLikeWrapper forType(Types.StructType type) { + return forType(type, Comparators.forType(type)); } private final Comparator<StructLike> comparator; @@ -36,10 +41,6 @@ public class StructLikeWrapper { private Integer hashCode; private StructLike struct; - private StructLikeWrapper(Types.StructType type) { - this(Comparators.forType(type), JavaHash.forType(type)); - } - private StructLikeWrapper(Comparator<StructLike> comparator, JavaHash<StructLike> structHash) { this.comparator = comparator; this.structHash = structHash; diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java index ff9dfd1afc..7eb2b9cefa 100644 --- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java @@ -79,7 +79,7 @@ public abstract class MetadataTableScanTestBase extends TestBase { protected void validatePartition( CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries, int position, - int partitionValue) { + Object partitionValue) { assertThat(entries) .as("File scan tasks do not include correct file") .anyMatch( diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 5d0919c568..51a976612e 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -655,13 +655,19 @@ public class TestBase { } } + protected DataFile newDataFile(StructLike partition) { + return newDataFileBuilder(table).withPartition(partition).build(); + } + protected DataFile newDataFile(String partitionPath) { + return newDataFileBuilder(table).withPartitionPath(partitionPath).build(); + } + + private static DataFiles.Builder newDataFileBuilder(Table table) { return DataFiles.builder(table.spec()) .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") .withFileSizeInBytes(10) - .withPartitionPath(partitionPath) - .withRecordCount(1) - .build(); + .withRecordCount(1); } protected DeleteFile fileADeletes() { diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java index 03338804d8..fe3a7c2686 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java @@ -31,9 +31,11 @@ import java.util.stream.Stream; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -243,6 +245,90 @@ public class TestMetadataTableScansWithPartitionEvolution extends MetadataTableS } } + @TestTemplate + public void testPartitionSpecEvolutionNullValues() throws IOException { + Schema schema = + new Schema( + required(1, "company_id", Types.IntegerType.get()), + required(2, "dept_id", Types.IntegerType.get()), + required(3, "team_id", Types.IntegerType.get())); + + table = + TestTables.create( + tableDir, + metadataDir, + "nulltest", + schema, + PartitionSpec.builderFor(schema).identity("company_id").build(), + SortOrder.unsorted(), + formatVersion); + table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(new Object[] {null}))).commit(); + + table.updateSpec().addField("dept_id").commit(); + table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(null, null))).commit(); + + table.updateSpec().addField("team_id").commit(); + table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(null, null, null))).commit(); + + assertPartitions( + "company_id=null", + "company_id=null/dept_id=null", + "company_id=null/dept_id=null/team_id=null"); + } + + @TestTemplate + public void testPartitionSpecRenameFields() throws IOException { + Schema schema = + new Schema( + required(1, "data", Types.StringType.get()), + required(2, "category", Types.StringType.get())); + + table = + TestTables.create( + tableDir, + metadataDir, + "renametest", + schema, + PartitionSpec.builderFor(schema).identity("data").identity("category").build(), + SortOrder.unsorted(), + formatVersion); + table + .newFastAppend() + .appendFile(newDataFile(TestHelpers.Row.of("c1", "d1"))) + .appendFile(newDataFile(TestHelpers.Row.of("c2", "d2"))) + .commit(); + + table.updateSpec().renameField("category", "category_another_name").commit(); + table + .newFastAppend() + .appendFile(newDataFile(TestHelpers.Row.of("c1", "d1"))) + .appendFile(newDataFile(TestHelpers.Row.of("c2", "d2"))) + .commit(); + + assertPartitions("data=c1/category_another_name=d1", "data=c2/category_another_name=d2"); + } + + private void assertPartitions(String... expected) throws IOException { + PartitionsTable partitionsTable = new PartitionsTable(table); + + try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) { + List<String> partitions = + FluentIterable.from(fileScanTasks) + .transformAndConcat(task -> task.asDataTask().rows()) + .transform( + row -> { + StructLike data = row.get(0, StructProjection.class); + PartitionSpec spec = table.specs().get(row.get(1, Integer.class)); + + PartitionData keyTemplate = new PartitionData(spec.partitionType()); + return spec.partitionToPath(keyTemplate.copyFor((data))); + }) + .toList(); + + assertThat(partitions).containsExactlyInAnyOrder(expected); + } + } + private Stream<StructLike> allRows(Iterable<FileScanTask> tasks) { return Streams.stream(tasks).flatMap(task -> Streams.stream(task.asDataTask().rows())); } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index ad5369ea5e..073a95fca2 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -59,10 +59,21 @@ public class TestTables { PartitionSpec spec, SortOrder sortOrder, int formatVersion) { + return create(temp, null, name, schema, spec, sortOrder, formatVersion); + } + + public static TestTable create( + File temp, + File metaTemp, + String name, + Schema schema, + PartitionSpec spec, + SortOrder sortOrder, + int formatVersion) { TestTableOperations ops = new TestTableOperations(name, temp); return createTable( - temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); + temp, metaTemp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); } public static TestTable create( @@ -74,7 +85,7 @@ public class TestTables { int formatVersion, TestTableOperations ops) { return createTable( - temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); + temp, null, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, null, ops); } public static TestTable create( @@ -88,7 +99,7 @@ public class TestTables { TestTableOperations ops = new TestTableOperations(name, temp); return createTable( - temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, reporter, ops); + temp, null, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, reporter, ops); } public static TestTable create( @@ -101,11 +112,12 @@ public class TestTables { TestTableOperations ops = new TestTableOperations(name, temp); return createTable( - temp, name, schema, spec, formatVersion, properties, SortOrder.unsorted(), null, ops); + temp, null, name, schema, spec, formatVersion, properties, SortOrder.unsorted(), null, ops); } private static TestTable createTable( File temp, + File metaTemp, String name, Schema schema, PartitionSpec spec, @@ -118,9 +130,18 @@ public class TestTables { throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); } - ops.commit( - null, - newTableMetadata(schema, spec, sortOrder, temp.toString(), properties, formatVersion)); + TableMetadata metadata = + newTableMetadata(schema, spec, sortOrder, temp.toString(), properties, formatVersion); + + if (metaTemp != null) { + metadata = + TableMetadata.buildFrom(metadata) + .discardChanges() + .withMetadataLocation(metaTemp.toString()) + .build(); + } + + ops.commit(null, metadata); if (reporter != null) { return new TestTable(ops, reporter); @@ -307,7 +328,11 @@ public class TestTables { } Integer version = VERSIONS.get(tableName); // remove changes from the committed metadata - this.current = TableMetadata.buildFrom(updatedMetadata).discardChanges().build(); + this.current = + TableMetadata.buildFrom(updatedMetadata) + .discardChanges() + .withMetadataLocation((current != null) ? current.metadataFileLocation() : null) + .build(); VERSIONS.put(tableName, version == null ? 0 : version + 1); METADATA.put(tableName, current); } else {