This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3b4396f619a HIVE-28837: Iceberg: PartitionsTable#partitions returns
incomplete list in case of partition evolution and NULL partition values (Denys
Kuzmenko, reviewed by Dmitriy Fingerman)
3b4396f619a is described below
commit 3b4396f619a96c15339920f311c0ba7b90d5fc94
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Mar 21 09:38:30 2025 +0100
HIVE-28837: Iceberg: PartitionsTable#partitions returns incomplete list in
case of partition evolution and NULL partition values (Denys Kuzmenko, reviewed
by Dmitriy Fingerman)
Closes #5698
---
iceberg/iceberg-catalog/pom.xml | 6 +
iceberg/iceberg-handler/pom.xml | 6 +
.../iceberg_major_compaction_partition_evolution.q | 1 +
...berg_major_compaction_partition_evolution.q.out | 1 +
iceberg/iceberg-shading/pom.xml | 2 +
iceberg/patched-iceberg-api/pom.xml | 6 +
.../org/apache/iceberg/util/StructProjection.java | 225 +++++++++++++++++++++
iceberg/patched-iceberg-core/pom.xml | 2 +
.../java/org/apache/iceberg/PartitionsTable.java | 67 +++---
.../org/apache/iceberg/util/StructLikeMap.java | 187 +++++++++++++++++
.../org/apache/iceberg/util/StructLikeWrapper.java | 103 ++++++++++
11 files changed, 571 insertions(+), 35 deletions(-)
diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml
index dd6848d43c3..f9cf711b581 100644
--- a/iceberg/iceberg-catalog/pom.xml
+++ b/iceberg/iceberg-catalog/pom.xml
@@ -107,6 +107,12 @@
<artifactId>iceberg-core</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</project>
diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml
index f8984fc741b..00836836ff3 100644
--- a/iceberg/iceberg-handler/pom.xml
+++ b/iceberg/iceberg-handler/pom.xml
@@ -118,6 +118,12 @@
<artifactId>iceberg-core</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
<build>
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q
index 32542719623..454ea83732f 100644
---
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q
+++
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q
@@ -12,6 +12,7 @@
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
--!
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--!
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
index 1497963d94a..a430c7455ef 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
@@ -447,6 +447,7 @@ POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker
host Worker Enqueue Time Start Time Duration(ms) HadoopJobId
Error message Initiator host Initiator Pool name TxnId Next
TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR
succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR
succeeded #Masked# manual iceberg 0 0 0 ---
+#Masked# default ice_orc company_id=null/dept_id=null MAJOR refused
#Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked#
manual iceberg 0 0 0 ---
PREHOOK: query: select `partition`, spec_id, content, record_count
from default.ice_orc.files
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index aa451b2574e..af53f0f7536 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -108,9 +108,11 @@
<include>com.google*:*</include>
<include>com.fasterxml*:*</include>
<include>com.github.ben-manes*:*</include>
+ <include>org.apache.hive:patched-iceberg-api</include>
<include>org.apache.hive:patched-iceberg-core</include>
</includes>
<excludes>
+ <exclude>org.apache.iceberg:iceberg-api</exclude>
<exclude>org.apache.iceberg:iceberg-core</exclude>
</excludes>
</artifactSet>
diff --git a/iceberg/patched-iceberg-api/pom.xml
b/iceberg/patched-iceberg-api/pom.xml
index 6ef47815960..f642eec50eb 100644
--- a/iceberg/patched-iceberg-api/pom.xml
+++ b/iceberg/patched-iceberg-api/pom.xml
@@ -36,6 +36,11 @@
<version>${iceberg.version}</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-bundled-guava</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -59,6 +64,7 @@
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<excludes>
+ **/StructProjection.class
</excludes>
</artifactItem>
</artifactItems>
diff --git
a/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
b/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
new file mode 100644
index 00000000000..15f5d965206
--- /dev/null
+++
b/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.StructType;
+
+public class StructProjection implements StructLike {
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * <p>This projection does not work with repeated types like lists and maps.
+ *
+ * @param schema schema of rows wrapped by this projection
+ * @param ids field ids from the row schema to project
+ * @return a wrapper to project rows
+ */
+ public static StructProjection create(Schema schema, Set<Integer> ids) {
+ StructType structType = schema.asStruct();
+ return new StructProjection(structType, TypeUtil.project(structType, ids));
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * <p>This projection does not work with repeated types like lists and maps.
+ *
+ * @param dataSchema schema of rows wrapped by this projection
+ * @param projectedSchema result schema of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection create(Schema dataSchema, Schema
projectedSchema) {
+ return new StructProjection(dataSchema.asStruct(),
projectedSchema.asStruct());
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * <p>This projection does not work with repeated types like lists and maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection create(StructType structType, StructType
projectedStructType) {
+ return new StructProjection(structType, projectedStructType);
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * <p>This projection allows missing fields and does not work with repeated
types like lists and
+ * maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection createAllowMissing(
+ StructType structType, StructType projectedStructType) {
+ return new StructProjection(structType, projectedStructType, true);
+ }
+
+ private final StructType type;
+ private final int[] positionMap;
+ private final StructProjection[] nestedProjections;
+ private StructLike struct;
+
+ private StructProjection(
+ StructType type, int[] positionMap, StructProjection[]
nestedProjections) {
+ this.type = type;
+ this.positionMap = positionMap;
+ this.nestedProjections = nestedProjections;
+ }
+
+ private StructProjection(StructType structType, StructType projection) {
+ this(structType, projection, false);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private StructProjection(StructType structType, StructType projection,
boolean allowMissing) {
+ this.type = projection;
+ this.positionMap = new int[projection.fields().size()];
+ this.nestedProjections = new StructProjection[projection.fields().size()];
+
+ // set up the projection positions and any nested projections that are
needed
+ List<Types.NestedField> dataFields = structType.fields();
+ for (int pos = 0; pos < positionMap.length; pos += 1) {
+ Types.NestedField projectedField = projection.fields().get(pos);
+
+ boolean found = false;
+ for (int i = 0; !found && i < dataFields.size(); i += 1) {
+ Types.NestedField dataField = dataFields.get(i);
+ if (projectedField.fieldId() == dataField.fieldId()) {
+ found = true;
+ positionMap[pos] = i;
+ switch (projectedField.type().typeId()) {
+ case STRUCT:
+ nestedProjections[pos] =
+ new StructProjection(
+ dataField.type().asStructType(),
projectedField.type().asStructType());
+ break;
+ case MAP:
+ MapType projectedMap = projectedField.type().asMapType();
+ MapType originalMap = dataField.type().asMapType();
+
+ boolean keyProjectable =
+ !projectedMap.keyType().isNestedType() ||
+ projectedMap.keyType().equals(originalMap.keyType());
+ boolean valueProjectable =
+ !projectedMap.valueType().isNestedType() ||
+ projectedMap.valueType().equals(originalMap.valueType());
+ Preconditions.checkArgument(
+ keyProjectable && valueProjectable,
+ "Cannot project a partial map key or value struct. Trying to
project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ case LIST:
+ ListType projectedList = projectedField.type().asListType();
+ ListType originalList = dataField.type().asListType();
+
+ boolean elementProjectable =
+ !projectedList.elementType().isNestedType() ||
+
projectedList.elementType().equals(originalList.elementType());
+ Preconditions.checkArgument(
+ elementProjectable,
+ "Cannot project a partial list element struct. Trying to
project %s out of %s",
+ projectedField,
+ dataField);
+
+ nestedProjections[pos] = null;
+ break;
+ default:
+ nestedProjections[pos] = null;
+ }
+ }
+ }
+
+ if (!found && projectedField.isOptional() && allowMissing) {
+ positionMap[pos] = -1;
+ nestedProjections[pos] = null;
+ } else if (!found) {
+ throw new IllegalArgumentException(
+ String.format("Cannot find field %s in %s", projectedField,
structType));
+ }
+ }
+ }
+
+ public int projectedFields() {
+ return (int) Ints.asList(positionMap).stream().filter(val -> val !=
-1).count();
+ }
+
+ public StructProjection wrap(StructLike newStruct) {
+ this.struct = newStruct;
+ return this;
+ }
+
+ public StructProjection copyFor(StructLike newStruct) {
+ return new StructProjection(type, positionMap,
nestedProjections).wrap(newStruct);
+ }
+
+ @Override
+ public int size() {
+ return type.fields().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ // struct can be null if wrap is not called first before the get call
+ // or if a null struct is wrapped.
+ if (struct == null) {
+ return null;
+ }
+
+ int structPos = positionMap[pos];
+ if (nestedProjections[pos] != null) {
+ StructLike nestedStruct = struct.get(structPos, StructLike.class);
+ if (nestedStruct == null) {
+ return null;
+ }
+
+ return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
+ }
+
+ if (structPos != -1) {
+ return struct.get(structPos, javaClass);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Cannot set fields in a
TypeProjection");
+ }
+}
diff --git a/iceberg/patched-iceberg-core/pom.xml
b/iceberg/patched-iceberg-core/pom.xml
index a894ef254a5..5180036b7cf 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -94,6 +94,8 @@
<excludes>
**/HadoopInputFile.class
**/HadoopTableOperations.class
+ **/StructLikeMap.class
+ **/StructLikeWrapper.class
org.apache.iceberg.avro.ValueReaders.class
org.apache.iceberg.avro.ValueWriters.class
org.apache.iceberg.BaseScan.class
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 654b82faf09..a1f31e28443 100644
---
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -23,16 +23,17 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Comparator;
import java.util.List;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeMap;
-
-// TODO: remove class once upgraded to Iceberg v1.7.0
+import org.apache.iceberg.util.StructProjection;
/** A {@link Table} implementation that exposes a table's partitions as rows.
*/
public class PartitionsTable extends BaseMetadataTable {
@@ -168,21 +169,26 @@ private static StaticDataTask.Row
convertPartition(Partition partition) {
private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
Types.StructType partitionType = Partitioning.partitionType(table);
- PartitionMap partitions = new PartitionMap(partitionType);
+
+ StructLikeMap<Partition> partitions =
+ StructLikeMap.create(partitionType, new
PartitionComparator(partitionType));
+
try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries =
planEntries(scan)) {
for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
Snapshot snapshot = table.snapshot(entry.snapshotId());
ContentFile<?> file = entry.file();
- StructLike partition =
+ StructLike key =
PartitionUtil.coercePartition(
partitionType, table.specs().get(file.specId()),
file.partition());
- partitions.get(partition).update(file, snapshot);
+ partitions
+ .computeIfAbsent(key, () -> new Partition(key, partitionType))
+ .update(file, snapshot);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- return partitions.all();
+ return partitions.values();
}
@VisibleForTesting
@@ -241,26 +247,25 @@ private class PartitionsScan extends StaticTableScan {
}
}
- static class PartitionMap {
- private final StructLikeMap<Partition> partitions;
- private final Types.StructType keyType;
+ private static class PartitionComparator implements Comparator<StructLike> {
+ private Comparator<StructLike> comparator;
- PartitionMap(Types.StructType type) {
- this.partitions = StructLikeMap.create(type);
- this.keyType = type;
+ private PartitionComparator(Types.StructType struct) {
+ this.comparator = Comparators.forType(struct);
}
- Partition get(StructLike key) {
- Partition partition = partitions.get(key);
- if (partition == null) {
- partition = new Partition(key, keyType);
- partitions.put(key, partition);
+ @Override
+ public int compare(StructLike o1, StructLike o2) {
+ if (o1 instanceof StructProjection && o2 instanceof StructProjection) {
+ int cmp =
+ Integer.compare(
+ ((StructProjection) o1).projectedFields(),
+ ((StructProjection) o2).projectedFields());
+ if (cmp != 0) {
+ return cmp;
+ }
}
- return partition;
- }
-
- Iterable<Partition> all() {
- return partitions.values();
+ return comparator.compare(o1, o2);
}
}
@@ -293,27 +298,25 @@ void update(ContentFile<?> file, Snapshot snapshot) {
if (snapshot != null) {
long snapshotCommitTime = snapshot.timestampMillis() * 1000;
if (this.lastUpdatedAt == null || snapshotCommitTime >
this.lastUpdatedAt) {
+ this.specId = file.specId();
+
this.lastUpdatedAt = snapshotCommitTime;
this.lastUpdatedSnapshotId = snapshot.snapshotId();
}
}
-
switch (file.content()) {
case DATA:
this.dataRecordCount += file.recordCount();
this.dataFileCount += 1;
- this.specId = file.specId();
this.dataFileSizeInBytes += file.fileSizeInBytes();
break;
case POSITION_DELETES:
this.posDeleteRecordCount += file.recordCount();
this.posDeleteFileCount += 1;
- this.specId = file.specId();
break;
case EQUALITY_DELETES:
this.eqDeleteRecordCount += file.recordCount();
this.eqDeleteFileCount += 1;
- this.specId = file.specId();
break;
default:
throw new UnsupportedOperationException(
@@ -322,15 +325,9 @@ void update(ContentFile<?> file, Snapshot snapshot) {
}
/** Needed because StructProjection is not serializable */
- private PartitionData toPartitionData(StructLike key, Types.StructType
keyType) {
- PartitionData data = new PartitionData(keyType);
- for (int i = 0; i < keyType.fields().size(); i++) {
- Object val = key.get(i,
keyType.fields().get(i).type().typeId().javaClass());
- if (val != null) {
- data.set(i, val);
- }
- }
- return data;
+ private static PartitionData toPartitionData(StructLike key,
Types.StructType keyType) {
+ PartitionData keyTemplate = new PartitionData(keyType);
+ return keyTemplate.copyFor(key);
}
}
}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
new file mode 100644
index 00000000000..0efc9e44680
--- /dev/null
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+
+public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements
Map<StructLike, T> {
+
+ public static <T> StructLikeMap<T> create(
+ Types.StructType type, Comparator<StructLike> comparator) {
+ return new StructLikeMap<>(type, comparator);
+ }
+
+ public static <T> StructLikeMap<T> create(Types.StructType type) {
+ return create(type, Comparators.forType(type));
+ }
+
+ private final Types.StructType type;
+ private final Map<StructLikeWrapper, T> wrapperMap;
+ private final ThreadLocal<StructLikeWrapper> wrappers;
+
+ private StructLikeMap(Types.StructType type, Comparator<StructLike>
comparator) {
+ this.type = type;
+ this.wrapperMap = Maps.newHashMap();
+ this.wrappers = ThreadLocal.withInitial(() ->
StructLikeWrapper.forType(type, comparator));
+ }
+
+ @Override
+ public int size() {
+ return wrapperMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return wrapperMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ boolean result = wrapperMap.containsKey(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return result;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return wrapperMap.containsValue(value);
+ }
+
+ @Override
+ public T get(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ T value = wrapperMap.get(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return value;
+ }
+ return null;
+ }
+
+ @Override
+ public T put(StructLike key, T value) {
+ return wrapperMap.put(wrappers.get().copyFor(key), value);
+ }
+
+ @Override
+ public T remove(Object key) {
+ if (key instanceof StructLike || key == null) {
+ StructLikeWrapper wrapper = wrappers.get();
+ T value = wrapperMap.remove(wrapper.set((StructLike) key));
+ wrapper.set(null); // don't hold a reference to the key.
+ return value;
+ }
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ wrapperMap.clear();
+ }
+
+ @Override
+ public Set<StructLike> keySet() {
+ StructLikeSet keySet = StructLikeSet.create(type);
+ for (StructLikeWrapper wrapper : wrapperMap.keySet()) {
+ keySet.add(wrapper.get());
+ }
+ return keySet;
+ }
+
+ @Override
+ public Collection<T> values() {
+ return wrapperMap.values();
+ }
+
+ @Override
+ public Set<Entry<StructLike, T>> entrySet() {
+ Set<Entry<StructLike, T>> entrySet = Sets.newHashSet();
+ for (Entry<StructLikeWrapper, T> entry : wrapperMap.entrySet()) {
+ entrySet.add(new StructLikeEntry<>(entry));
+ }
+ return entrySet;
+ }
+
+ public T computeIfAbsent(StructLike struct, Supplier<T> valueSupplier) {
+ return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key ->
valueSupplier.get());
+ }
+
+ private static class StructLikeEntry<R> implements Entry<StructLike, R> {
+
+ private final Entry<StructLikeWrapper, R> inner;
+
+ private StructLikeEntry(Entry<StructLikeWrapper, R> inner) {
+ this.inner = inner;
+ }
+
+ @Override
+ public StructLike getKey() {
+ return inner.getKey().get();
+ }
+
+ @Override
+ public R getValue() {
+ return inner.getValue();
+ }
+
+ @Override
+ public int hashCode() {
+ return inner.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StructLikeEntry<?> that = (StructLikeEntry<?>) o;
+ return inner.equals(that.inner);
+ }
+
+ @Override
+ public R setValue(R value) {
+ throw new UnsupportedOperationException("Does not support setValue.");
+ }
+ }
+
+ public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
+ StructLikeMap<U> result = create(type);
+ wrapperMap.forEach((key, value) -> result.put(key.get(),
func.apply(value)));
+ return result;
+ }
+}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
new file mode 100644
index 00000000000..3dbb91a43ce
--- /dev/null
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Comparator;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.JavaHash;
+import org.apache.iceberg.types.Types;
+
+/** Wrapper to adapt StructLike for use in maps and sets by implementing
equals and hashCode. */
+public class StructLikeWrapper {
+
+ public static StructLikeWrapper forType(
+ Types.StructType type, Comparator<StructLike> comparator) {
+ return new StructLikeWrapper(comparator, JavaHash.forType(type));
+ }
+
+ public static StructLikeWrapper forType(Types.StructType type) {
+ return forType(type, Comparators.forType(type));
+ }
+
+ private final Comparator<StructLike> comparator;
+ private final JavaHash<StructLike> structHash;
+ private Integer hashCode;
+ private StructLike struct;
+
+ private StructLikeWrapper(Comparator<StructLike> comparator,
JavaHash<StructLike> structHash) {
+ this.comparator = comparator;
+ this.structHash = structHash;
+ this.hashCode = null;
+ }
+
+ /**
+ * Creates a copy of this wrapper that wraps a struct.
+ *
+ * <p>This is equivalent to {@code new
StructLikeWrapper(type).set(newStruct)} but is cheaper
+ * because no analysis of the type is necessary.
+ *
+ * @param newStruct a {@link StructLike} row
+ * @return a copy of this wrapper wrapping the give struct
+ */
+ public StructLikeWrapper copyFor(StructLike newStruct) {
+ return new StructLikeWrapper(comparator, structHash).set(newStruct);
+ }
+
+ public StructLikeWrapper set(StructLike newStruct) {
+ this.struct = newStruct;
+ this.hashCode = null;
+ return this;
+ }
+
+ public StructLike get() {
+ return struct;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (!(other instanceof StructLikeWrapper)) {
+ return false;
+ }
+
+ StructLikeWrapper that = (StructLikeWrapper) other;
+
+ if (this.struct == that.struct) {
+ return true;
+ }
+
+ if (this.struct == null ^ that.struct == null) {
+ return false;
+ }
+
+ return comparator.compare(this.struct, that.struct) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashCode == null) {
+ this.hashCode = structHash.hash(struct);
+ }
+
+ return hashCode;
+ }
+}