This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new fa162de015 Core: PartitionsTable#partitions returns incomplete list in 
case of partition evolution and NULL partition values (#12528)
fa162de015 is described below

commit fa162de0151121adbec33cbbdd029e6f61da762b
Author: Denys Kuzmenko <dkuzme...@cloudera.com>
AuthorDate: Wed Jun 18 20:02:11 2025 +0200

    Core: PartitionsTable#partitions returns incomplete list in case of 
partition evolution and NULL partition values (#12528)
---
 .../org/apache/iceberg/util/StructProjection.java  |  5 ++
 .../java/org/apache/iceberg/PartitionsTable.java   | 63 ++++++++--------
 .../org/apache/iceberg/util/StructLikeMap.java     | 31 +++++++-
 .../org/apache/iceberg/util/StructLikeWrapper.java | 13 ++--
 .../apache/iceberg/MetadataTableScanTestBase.java  |  2 +-
 .../src/test/java/org/apache/iceberg/TestBase.java | 12 ++-
 ...stMetadataTableScansWithPartitionEvolution.java | 86 ++++++++++++++++++++++
 .../test/java/org/apache/iceberg/TestTables.java   | 41 +++++++++--
 8 files changed, 201 insertions(+), 52 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java 
b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
index 8e2fc7a141..08dedf0fe1 100644
--- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.ListType;
@@ -173,6 +174,10 @@ public class StructProjection implements StructLike {
     }
   }
 
+  public int projectedFields() {
+    return (int) Ints.asList(positionMap).stream().filter(val -> val != 
-1).count();
+  }
+
   public StructProjection wrap(StructLike newStruct) {
     this.struct = newStruct;
     return this;
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 6d0fc8c235..09c6e7893b 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -22,14 +22,17 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Comparator;
 import java.util.List;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.iceberg.util.StructLikeMap;
+import org.apache.iceberg.util.StructProjection;
 
 /** A {@link Table} implementation that exposes a table's partitions as rows. 
*/
 public class PartitionsTable extends BaseMetadataTable {
@@ -165,21 +168,26 @@ public class PartitionsTable extends BaseMetadataTable {
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
     Types.StructType partitionType = Partitioning.partitionType(table);
-    PartitionMap partitions = new PartitionMap(partitionType);
+
+    StructLikeMap<Partition> partitions =
+        StructLikeMap.create(partitionType, new 
PartitionComparator(partitionType));
+
     try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = 
planEntries(scan)) {
       for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
         Snapshot snapshot = table.snapshot(entry.snapshotId());
         ContentFile<?> file = entry.file();
-        StructLike partition =
+        StructLike key =
             PartitionUtil.coercePartition(
                 partitionType, table.specs().get(file.specId()), 
file.partition());
-        partitions.get(partition).update(file, snapshot);
+        partitions
+            .computeIfAbsent(key, () -> new Partition(key, partitionType))
+            .update(file, snapshot);
       }
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
 
-    return partitions.all();
+    return partitions.values();
   }
 
   @VisibleForTesting
@@ -238,26 +246,26 @@ public class PartitionsTable extends BaseMetadataTable {
     }
   }
 
-  static class PartitionMap {
-    private final StructLikeMap<Partition> partitions;
-    private final Types.StructType keyType;
+  private static class PartitionComparator implements Comparator<StructLike> {
+    private Comparator<StructLike> comparator;
 
-    PartitionMap(Types.StructType type) {
-      this.partitions = StructLikeMap.create(type);
-      this.keyType = type;
+    private PartitionComparator(Types.StructType struct) {
+      this.comparator = Comparators.forType(struct);
     }
 
-    Partition get(StructLike key) {
-      Partition partition = partitions.get(key);
-      if (partition == null) {
-        partition = new Partition(key, keyType);
-        partitions.put(key, partition);
+    @Override
+    public int compare(StructLike o1, StructLike o2) {
+      if (o1 instanceof StructProjection && o2 instanceof StructProjection) {
+        int cmp =
+            Integer.compare(
+                ((StructProjection) o1).projectedFields(),
+                ((StructProjection) o2).projectedFields());
+        if (cmp != 0) {
+          return cmp;
+        }
       }
-      return partition;
-    }
 
-    Iterable<Partition> all() {
-      return partitions.values();
+      return comparator.compare(o1, o2);
     }
   }
 
@@ -290,6 +298,8 @@ public class PartitionsTable extends BaseMetadataTable {
       if (snapshot != null) {
         long snapshotCommitTime = snapshot.timestampMillis() * 1000;
         if (this.lastUpdatedAt == null || snapshotCommitTime > 
this.lastUpdatedAt) {
+          this.specId = file.specId();
+
           this.lastUpdatedAt = snapshotCommitTime;
           this.lastUpdatedSnapshotId = snapshot.snapshotId();
         }
@@ -299,18 +309,15 @@ public class PartitionsTable extends BaseMetadataTable {
         case DATA:
           this.dataRecordCount += file.recordCount();
           this.dataFileCount += 1;
-          this.specId = file.specId();
           this.dataFileSizeInBytes += file.fileSizeInBytes();
           break;
         case POSITION_DELETES:
           this.posDeleteRecordCount += file.recordCount();
           this.posDeleteFileCount += 1;
-          this.specId = file.specId();
           break;
         case EQUALITY_DELETES:
           this.eqDeleteRecordCount += file.recordCount();
           this.eqDeleteFileCount += 1;
-          this.specId = file.specId();
           break;
         default:
           throw new UnsupportedOperationException(
@@ -319,15 +326,9 @@ public class PartitionsTable extends BaseMetadataTable {
     }
 
     /** Needed because StructProjection is not serializable */
-    private PartitionData toPartitionData(StructLike key, Types.StructType 
keyType) {
-      PartitionData data = new PartitionData(keyType);
-      for (int i = 0; i < keyType.fields().size(); i++) {
-        Object val = key.get(i, 
keyType.fields().get(i).type().typeId().javaClass());
-        if (val != null) {
-          data.set(i, val);
-        }
-      }
-      return data;
+    private static PartitionData toPartitionData(StructLike key, 
Types.StructType keyType) {
+      PartitionData keyTemplate = new PartitionData(keyType);
+      return keyTemplate.copyFor(key);
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java 
b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
index 2bb5fa1c9d..e0d5c0c6f5 100644
--- a/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
+++ b/core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
@@ -20,28 +20,49 @@ package org.apache.iceberg.util;
 
 import java.util.AbstractMap;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
 
 public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements 
Map<StructLike, T> {
 
+  /**
+   * Creates a new StructLikeMap with the specified type and comparator.
+   *
+   * @param type the struct type for the keys
+   * @param comparator the comparator for comparing struct keys
+   * @return a new StructLikeMap instance
+   */
+  public static <T> StructLikeMap<T> create(
+      Types.StructType type, Comparator<StructLike> comparator) {
+    return new StructLikeMap<>(type, comparator);
+  }
+
+  /**
+   * Creates a new StructLikeMap with the specified type using the default 
comparator for the type.
+   *
+   * @param type the struct type for the keys
+   * @return a new StructLikeMap instance
+   */
   public static <T> StructLikeMap<T> create(Types.StructType type) {
-    return new StructLikeMap<>(type);
+    return create(type, Comparators.forType(type));
   }
 
   private final Types.StructType type;
   private final Map<StructLikeWrapper, T> wrapperMap;
   private final ThreadLocal<StructLikeWrapper> wrappers;
 
-  private StructLikeMap(Types.StructType type) {
+  private StructLikeMap(Types.StructType type, Comparator<StructLike> 
comparator) {
     this.type = type;
     this.wrapperMap = Maps.newHashMap();
-    this.wrappers = ThreadLocal.withInitial(() -> 
StructLikeWrapper.forType(type));
+    this.wrappers = ThreadLocal.withInitial(() -> 
StructLikeWrapper.forType(type, comparator));
   }
 
   @Override
@@ -125,6 +146,10 @@ public class StructLikeMap<T> extends 
AbstractMap<StructLike, T> implements Map<
     return entrySet;
   }
 
+  public T computeIfAbsent(StructLike struct, Supplier<T> valueSupplier) {
+    return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key -> 
valueSupplier.get());
+  }
+
   private static class StructLikeEntry<R> implements Entry<StructLike, R> {
 
     private final Entry<StructLikeWrapper, R> inner;
diff --git a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java 
b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
index e8cf0a8db7..28629706bf 100644
--- a/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
+++ b/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
@@ -27,8 +27,13 @@ import org.apache.iceberg.types.Types;
 /** Wrapper to adapt StructLike for use in maps and sets by implementing 
equals and hashCode. */
 public class StructLikeWrapper {
 
-  public static StructLikeWrapper forType(Types.StructType struct) {
-    return new StructLikeWrapper(struct);
+  public static StructLikeWrapper forType(
+      Types.StructType type, Comparator<StructLike> comparator) {
+    return new StructLikeWrapper(comparator, JavaHash.forType(type));
+  }
+
+  public static StructLikeWrapper forType(Types.StructType type) {
+    return forType(type, Comparators.forType(type));
   }
 
   private final Comparator<StructLike> comparator;
@@ -36,10 +41,6 @@ public class StructLikeWrapper {
   private Integer hashCode;
   private StructLike struct;
 
-  private StructLikeWrapper(Types.StructType type) {
-    this(Comparators.forType(type), JavaHash.forType(type));
-  }
-
   private StructLikeWrapper(Comparator<StructLike> comparator, 
JavaHash<StructLike> structHash) {
     this.comparator = comparator;
     this.structHash = structHash;
diff --git 
a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java 
b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
index ff9dfd1afc..7eb2b9cefa 100644
--- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -79,7 +79,7 @@ public abstract class MetadataTableScanTestBase extends 
TestBase {
   protected void validatePartition(
       CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries,
       int position,
-      int partitionValue) {
+      Object partitionValue) {
     assertThat(entries)
         .as("File scan tasks do not include correct file")
         .anyMatch(
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java 
b/core/src/test/java/org/apache/iceberg/TestBase.java
index 5d0919c568..51a976612e 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -655,13 +655,19 @@ public class TestBase {
     }
   }
 
+  protected DataFile newDataFile(StructLike partition) {
+    return newDataFileBuilder(table).withPartition(partition).build();
+  }
+
   protected DataFile newDataFile(String partitionPath) {
+    return newDataFileBuilder(table).withPartitionPath(partitionPath).build();
+  }
+
+  private static DataFiles.Builder newDataFileBuilder(Table table) {
     return DataFiles.builder(table.spec())
         .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
         .withFileSizeInBytes(10)
-        .withPartitionPath(partitionPath)
-        .withRecordCount(1)
-        .build();
+        .withRecordCount(1);
   }
 
   protected DeleteFile fileADeletes() {
diff --git 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index 03338804d8..fe3a7c2686 100644
--- 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++ 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -31,9 +31,11 @@ import java.util.stream.Stream;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -243,6 +245,90 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
     }
   }
 
+  @TestTemplate
+  public void testPartitionSpecEvolutionNullValues() throws IOException {
+    Schema schema =
+        new Schema(
+            required(1, "company_id", Types.IntegerType.get()),
+            required(2, "dept_id", Types.IntegerType.get()),
+            required(3, "team_id", Types.IntegerType.get()));
+
+    table =
+        TestTables.create(
+            tableDir,
+            metadataDir,
+            "nulltest",
+            schema,
+            PartitionSpec.builderFor(schema).identity("company_id").build(),
+            SortOrder.unsorted(),
+            formatVersion);
+    table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(new 
Object[] {null}))).commit();
+
+    table.updateSpec().addField("dept_id").commit();
+    table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(null, 
null))).commit();
+
+    table.updateSpec().addField("team_id").commit();
+    table.newFastAppend().appendFile(newDataFile(TestHelpers.Row.of(null, 
null, null))).commit();
+
+    assertPartitions(
+        "company_id=null",
+        "company_id=null/dept_id=null",
+        "company_id=null/dept_id=null/team_id=null");
+  }
+
+  @TestTemplate
+  public void testPartitionSpecRenameFields() throws IOException {
+    Schema schema =
+        new Schema(
+            required(1, "data", Types.StringType.get()),
+            required(2, "category", Types.StringType.get()));
+
+    table =
+        TestTables.create(
+            tableDir,
+            metadataDir,
+            "renametest",
+            schema,
+            
PartitionSpec.builderFor(schema).identity("data").identity("category").build(),
+            SortOrder.unsorted(),
+            formatVersion);
+    table
+        .newFastAppend()
+        .appendFile(newDataFile(TestHelpers.Row.of("c1", "d1")))
+        .appendFile(newDataFile(TestHelpers.Row.of("c2", "d2")))
+        .commit();
+
+    table.updateSpec().renameField("category", 
"category_another_name").commit();
+    table
+        .newFastAppend()
+        .appendFile(newDataFile(TestHelpers.Row.of("c1", "d1")))
+        .appendFile(newDataFile(TestHelpers.Row.of("c2", "d2")))
+        .commit();
+
+    assertPartitions("data=c1/category_another_name=d1", 
"data=c2/category_another_name=d2");
+  }
+
+  private void assertPartitions(String... expected) throws IOException {
+    PartitionsTable partitionsTable = new PartitionsTable(table);
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
+      List<String> partitions =
+          FluentIterable.from(fileScanTasks)
+              .transformAndConcat(task -> task.asDataTask().rows())
+              .transform(
+                  row -> {
+                    StructLike data = row.get(0, StructProjection.class);
+                    PartitionSpec spec = table.specs().get(row.get(1, 
Integer.class));
+
+                    PartitionData keyTemplate = new 
PartitionData(spec.partitionType());
+                    return spec.partitionToPath(keyTemplate.copyFor((data)));
+                  })
+              .toList();
+
+      assertThat(partitions).containsExactlyInAnyOrder(expected);
+    }
+  }
+
   private Stream<StructLike> allRows(Iterable<FileScanTask> tasks) {
     return Streams.stream(tasks).flatMap(task -> 
Streams.stream(task.asDataTask().rows()));
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index ad5369ea5e..073a95fca2 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -59,10 +59,21 @@ public class TestTables {
       PartitionSpec spec,
       SortOrder sortOrder,
       int formatVersion) {
+    return create(temp, null, name, schema, spec, sortOrder, formatVersion);
+  }
+
+  public static TestTable create(
+      File temp,
+      File metaTemp,
+      String name,
+      Schema schema,
+      PartitionSpec spec,
+      SortOrder sortOrder,
+      int formatVersion) {
     TestTableOperations ops = new TestTableOperations(name, temp);
 
     return createTable(
-        temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, 
null, ops);
+        temp, metaTemp, name, schema, spec, formatVersion, ImmutableMap.of(), 
sortOrder, null, ops);
   }
 
   public static TestTable create(
@@ -74,7 +85,7 @@ public class TestTables {
       int formatVersion,
       TestTableOperations ops) {
     return createTable(
-        temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, 
null, ops);
+        temp, null, name, schema, spec, formatVersion, ImmutableMap.of(), 
sortOrder, null, ops);
   }
 
   public static TestTable create(
@@ -88,7 +99,7 @@ public class TestTables {
     TestTableOperations ops = new TestTableOperations(name, temp);
 
     return createTable(
-        temp, name, schema, spec, formatVersion, ImmutableMap.of(), sortOrder, 
reporter, ops);
+        temp, null, name, schema, spec, formatVersion, ImmutableMap.of(), 
sortOrder, reporter, ops);
   }
 
   public static TestTable create(
@@ -101,11 +112,12 @@ public class TestTables {
     TestTableOperations ops = new TestTableOperations(name, temp);
 
     return createTable(
-        temp, name, schema, spec, formatVersion, properties, 
SortOrder.unsorted(), null, ops);
+        temp, null, name, schema, spec, formatVersion, properties, 
SortOrder.unsorted(), null, ops);
   }
 
   private static TestTable createTable(
       File temp,
+      File metaTemp,
       String name,
       Schema schema,
       PartitionSpec spec,
@@ -118,9 +130,18 @@ public class TestTables {
       throw new AlreadyExistsException("Table %s already exists at location: 
%s", name, temp);
     }
 
-    ops.commit(
-        null,
-        newTableMetadata(schema, spec, sortOrder, temp.toString(), properties, 
formatVersion));
+    TableMetadata metadata =
+        newTableMetadata(schema, spec, sortOrder, temp.toString(), properties, 
formatVersion);
+
+    if (metaTemp != null) {
+      metadata =
+          TableMetadata.buildFrom(metadata)
+              .discardChanges()
+              .withMetadataLocation(metaTemp.toString())
+              .build();
+    }
+
+    ops.commit(null, metadata);
 
     if (reporter != null) {
       return new TestTable(ops, reporter);
@@ -307,7 +328,11 @@ public class TestTables {
           }
           Integer version = VERSIONS.get(tableName);
           // remove changes from the committed metadata
-          this.current = 
TableMetadata.buildFrom(updatedMetadata).discardChanges().build();
+          this.current =
+              TableMetadata.buildFrom(updatedMetadata)
+                  .discardChanges()
+                  .withMetadataLocation((current != null) ? 
current.metadataFileLocation() : null)
+                  .build();
           VERSIONS.put(tableName, version == null ? 0 : version + 1);
           METADATA.put(tableName, current);
         } else {

Reply via email to