This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c751cb24b6 Core: Simplify partition coercion code in PartitionsTable
(#7503)
c751cb24b6 is described below
commit c751cb24b6f59084f473395745a4778f64aaca75
Author: Szehon Ho <[email protected]>
AuthorDate: Thu May 4 07:37:06 2023 -0700
Core: Simplify partition coercion code in PartitionsTable (#7503)
---
.../java/org/apache/iceberg/PartitionsTable.java | 107 +++++++--------------
.../org/apache/iceberg/util/PartitionUtil.java | 2 +-
2 files changed, 37 insertions(+), 72 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 97c82c432f..6c22a6dbf4 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -20,13 +20,13 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import java.util.Map;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.StructLikeMap;
/** A {@link Table} implementation that exposes a table's partitions as rows.
*/
public class PartitionsTable extends BaseMetadataTable {
@@ -90,28 +90,22 @@ public class PartitionsTable extends BaseMetadataTable {
private static StaticDataTask.Row convertPartition(Partition partition) {
return StaticDataTask.Row.of(
- partition.key, partition.specId, partition.dataRecordCount,
partition.dataFileCount);
+ partition.partitionData,
+ partition.specId,
+ partition.dataRecordCount,
+ partition.dataFileCount);
}
private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- Types.StructType normalizedPartitionType =
Partitioning.partitionType(table);
- PartitionMap partitions = new PartitionMap();
-
- // cache a position map needed by each partition spec to normalize
partitions to final schema
- Map<Integer, int[]> normalizedPositionsBySpec =
- Maps.newHashMapWithExpectedSize(table.specs().size());
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ PartitionMap partitions = new PartitionMap(partitionType);
CloseableIterable<DataFile> datafiles = planDataFiles(scan);
for (DataFile dataFile : datafiles) {
- PartitionData original = (PartitionData) dataFile.partition();
- int[] normalizedPositions =
- normalizedPositionsBySpec.computeIfAbsent(
- dataFile.specId(),
- specId -> normalizedPositions(table, specId,
normalizedPartitionType));
-
- PartitionData normalized =
- normalizePartition(original, normalizedPartitionType,
normalizedPositions);
- partitions.get(normalized).update(dataFile);
+ StructLike partition =
+ PartitionUtil.coercePartition(
+ partitionType, table.specs().get(dataFile.specId()),
dataFile.partition());
+ partitions.get(partition).update(dataFile);
}
return partitions.all();
@@ -150,53 +144,6 @@ public class PartitionsTable extends BaseMetadataTable {
return new ParallelIterable<>(tasks, scan.planExecutor());
}
- /**
- * Builds an integer array for a specific partition type to map its
partitions to the final
- * normalized type.
- *
- * <p>The array represents fields in the original partition type, with the
index being the field's
- * index in the original partition type, and the value being the field's
index in the normalized
- * partition type.
- *
- * @param table iceberg table
- * @param specId spec id where original partition type is written
- * @param normalizedType normalized partition type
- */
- private static int[] normalizedPositions(
- Table table, int specId, Types.StructType normalizedType) {
- Types.StructType originalType = table.specs().get(specId).partitionType();
- int[] normalizedPositions = new int[originalType.fields().size()];
- for (int originalIndex = 0; originalIndex < originalType.fields().size();
originalIndex++) {
- Types.NestedField normalizedField =
-
normalizedType.field(originalType.fields().get(originalIndex).fieldId());
- normalizedPositions[originalIndex] =
normalizedType.fields().indexOf(normalizedField);
- }
- return normalizedPositions;
- }
-
- /**
- * Convert a partition data written by an old spec, to table's normalized
partition type, which is
- * a common partition type for all specs of the table.
- *
- * @param originalPartition un-normalized partition data
- * @param normalizedPartitionType table's normalized partition type {@link
- * Partitioning#partitionType(Table)}
- * @param normalizedPositions field positions in the normalized partition
type indexed by field
- * position in the original partition type
- * @return the normalized partition data
- */
- private static PartitionData normalizePartition(
- PartitionData originalPartition,
- Types.StructType normalizedPartitionType,
- int[] normalizedPositions) {
- PartitionData normalizedPartition = new
PartitionData(normalizedPartitionType);
- for (int originalIndex = 0; originalIndex < originalPartition.size();
originalIndex++) {
- normalizedPartition.put(
- normalizedPositions[originalIndex],
originalPartition.get(originalIndex));
- }
- return normalizedPartition;
- }
-
private class PartitionsScan extends StaticTableScan {
PartitionsScan(Table table) {
super(
@@ -208,12 +155,18 @@ public class PartitionsTable extends BaseMetadataTable {
}
static class PartitionMap {
- private final Map<PartitionData, Partition> partitions = Maps.newHashMap();
+ private final StructLikeMap<Partition> partitions;
+ private final Types.StructType keyType;
+
+ PartitionMap(Types.StructType type) {
+ this.partitions = StructLikeMap.create(type);
+ this.keyType = type;
+ }
- Partition get(PartitionData key) {
+ Partition get(StructLike key) {
Partition partition = partitions.get(key);
if (partition == null) {
- partition = new Partition(key);
+ partition = new Partition(key, keyType);
partitions.put(key, partition);
}
return partition;
@@ -225,13 +178,13 @@ public class PartitionsTable extends BaseMetadataTable {
}
static class Partition {
- private final StructLike key;
+ private final PartitionData partitionData;
private int specId;
private long dataRecordCount;
private int dataFileCount;
- Partition(StructLike key) {
- this.key = key;
+ Partition(StructLike key, Types.StructType keyType) {
+ this.partitionData = toPartitionData(key, keyType);
this.specId = 0;
this.dataRecordCount = 0;
this.dataFileCount = 0;
@@ -242,5 +195,17 @@ public class PartitionsTable extends BaseMetadataTable {
this.dataFileCount += 1;
this.specId = file.specId();
}
+
+ /** Needed because StructProjection is not serializable */
+ private PartitionData toPartitionData(StructLike key, Types.StructType
keyType) {
+ PartitionData data = new PartitionData(keyType);
+ for (int i = 0; i < keyType.fields().size(); i++) {
+ Object val = key.get(i,
keyType.fields().get(i).type().typeId().javaClass());
+ if (val != null) {
+ data.set(i, val);
+ }
+ }
+ return data;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index af2f79c3c6..b3a5e85d33 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -91,7 +91,7 @@ public class PartitionUtil {
}
// adapts the provided partition data to match the table partition type
- private static StructLike coercePartition(
+ public static StructLike coercePartition(
Types.StructType partitionType, PartitionSpec spec, StructLike
partition) {
StructProjection projection =
StructProjection.createAllowMissing(spec.partitionType(),
partitionType);