This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b6c949cd86 Core, Spark: Calling rewrite_position_delete_files fails on
tables with more than 1k columns (#10020)
b6c949cd86 is described below
commit b6c949cd86c372a87a4d43a557c19dd310af80d8
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Jun 12 16:50:17 2024 -0700
Core, Spark: Calling rewrite_position_delete_files fails on tables with
more than 1k columns (#10020)
---
.palantir/revapi.yml | 6 ++
.../java/org/apache/iceberg/PartitionSpec.java | 26 ++++++
api/src/main/java/org/apache/iceberg/Schema.java | 70 ++++++++++++++-
.../java/org/apache/iceberg/types/AssignIds.java | 99 ++++++++++++++++++++++
.../java/org/apache/iceberg/types/TypeUtil.java | 16 ++++
.../main/java/org/apache/iceberg/types/Types.java | 4 +
.../java/org/apache/iceberg/BaseMetadataTable.java | 5 +-
.../java/org/apache/iceberg/ManifestReader.java | 2 +-
.../org/apache/iceberg/PositionDeletesTable.java | 37 +++++++-
.../org/apache/iceberg/TestMetadataTableScans.java | 71 +++++++++++++++-
...stMetadataTableScansWithPartitionEvolution.java | 9 +-
.../RewritePositionDeleteFilesSparkAction.java | 11 ++-
.../TestRewritePositionDeleteFilesAction.java | 66 +++++++++++++++
.../RewritePositionDeleteFilesSparkAction.java | 12 +--
.../TestRewritePositionDeleteFilesAction.java | 65 ++++++++++++++
.../RewritePositionDeleteFilesSparkAction.java | 11 ++-
.../TestRewritePositionDeleteFilesAction.java | 65 ++++++++++++++
17 files changed, 541 insertions(+), 34 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index a41d3ddfb8..808a192990 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1018,6 +1018,12 @@ acceptedBreaks:
old: "method void
org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
+ "1.5.0":
+ org.apache.iceberg:iceberg-api:
+ - code: "java.class.defaultSerializationChanged"
+ old: "class org.apache.iceberg.types.Types.NestedField"
+ new: "class org.apache.iceberg.types.Types.NestedField"
+ justification: "new Constructor added"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 4fcb110db8..8f1df79403 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -60,6 +61,7 @@ public class PartitionSpec implements Serializable {
private transient volatile ListMultimap<Integer, PartitionField>
fieldsBySourceId = null;
private transient volatile Class<?>[] lazyJavaClasses = null;
private transient volatile StructType lazyPartitionType = null;
+ private transient volatile StructType lazyRawPartitionType = null;
private transient volatile List<PartitionField> fieldList = null;
private final int lastAssignedFieldId;
@@ -140,6 +142,30 @@ public class PartitionSpec implements Serializable {
return lazyPartitionType;
}
+ /**
+ * Returns a struct matching partition information as written into manifest
files. See {@link
+ * #partitionType()} for a struct with field ID's potentially re-assigned to
avoid conflict.
+ */
+ public StructType rawPartitionType() {
+ if (schema.idsToOriginal().isEmpty()) {
+ // not re-assigned.
+ return partitionType();
+ }
+ if (lazyRawPartitionType == null) {
+ synchronized (this) {
+ if (lazyRawPartitionType == null) {
+ this.lazyRawPartitionType =
+ StructType.of(
+ partitionType().fields().stream()
+ .map(f ->
f.withFieldId(schema.idsToOriginal().get(f.fieldId())))
+ .collect(Collectors.toList()));
+ }
+ }
+ }
+
+ return lazyRawPartitionType;
+ }
+
public Class<?>[] javaClasses() {
if (lazyJavaClasses == null) {
synchronized (this) {
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java
b/api/src/main/java/org/apache/iceberg/Schema.java
index 5e024b7c1c..d5ec3f2509 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
@@ -34,6 +35,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.Type;
@@ -65,6 +67,8 @@ public class Schema implements Serializable {
private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
private transient Map<Integer, String> idToName = null;
private transient Set<Integer> identifierFieldIdSet = null;
+ private transient Map<Integer, Integer> idsToReassigned;
+ private transient Map<Integer, Integer> idsToOriginal;
public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
this(columns, aliases, ImmutableSet.of());
@@ -83,12 +87,24 @@ public class Schema implements Serializable {
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
}
+ public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds,
TypeUtil.GetID getId) {
+ this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds, getId);
+ }
+
public Schema(int schemaId, List<NestedField> columns) {
this(schemaId, columns, ImmutableSet.of());
}
public Schema(int schemaId, List<NestedField> columns, Set<Integer>
identifierFieldIds) {
- this(schemaId, columns, null, identifierFieldIds);
+ this(schemaId, columns, null, identifierFieldIds, null);
+ }
+
+ public Schema(
+ int schemaId,
+ List<NestedField> columns,
+ Set<Integer> identifierFieldIds,
+ TypeUtil.GetID getId) {
+ this(schemaId, columns, null, identifierFieldIds, getId);
}
public Schema(
@@ -96,8 +112,22 @@ public class Schema implements Serializable {
List<NestedField> columns,
Map<String, Integer> aliases,
Set<Integer> identifierFieldIds) {
+ this(schemaId, columns, aliases, identifierFieldIds, null);
+ }
+
+ public Schema(
+ int schemaId,
+ List<NestedField> columns,
+ Map<String, Integer> aliases,
+ Set<Integer> identifierFieldIds,
+ TypeUtil.GetID getID) {
this.schemaId = schemaId;
- this.struct = StructType.of(columns);
+
+ this.idsToOriginal = Maps.newHashMap();
+ this.idsToReassigned = Maps.newHashMap();
+ List<NestedField> finalColumns = reassignIds(columns, getID);
+
+ this.struct = StructType.of(finalColumns);
this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
// validate IdentifierField
@@ -507,4 +537,40 @@ public class Schema implements Serializable {
.map(this::identifierFieldToString)
.collect(Collectors.toList())));
}
+
+ /**
+ * The ID's of some fields will be re-assigned if GetID is specified for the
Schema.
+ *
+ * @return map of original to reassigned field ids
+ */
+ public Map<Integer, Integer> idsToReassigned() {
+ return idsToReassigned != null ? idsToReassigned : Collections.emptyMap();
+ }
+
+ /**
+ * The ID's of some fields will be re-assigned if GetID is specified for the
Schema.
+ *
+ * @return map of reassigned to original field ids
+ */
+ public Map<Integer, Integer> idsToOriginal() {
+ return idsToOriginal != null ? idsToOriginal : Collections.emptyMap();
+ }
+
+ private List<NestedField> reassignIds(List<NestedField> columns,
TypeUtil.GetID getID) {
+ if (getID == null) {
+ return columns;
+ }
+ Type res =
+ TypeUtil.assignIds(
+ StructType.of(columns),
+ oldId -> {
+ int newId = getID.get(oldId);
+ if (newId != oldId) {
+ idsToReassigned.put(oldId, newId);
+ idsToOriginal.put(newId, oldId);
+ }
+ return newId;
+ });
+ return res.asStructType().fields();
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/types/AssignIds.java
b/api/src/main/java/org/apache/iceberg/types/AssignIds.java
new file mode 100644
index 0000000000..68588f581a
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/types/AssignIds.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.types;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class AssignIds extends TypeUtil.CustomOrderSchemaVisitor<Type> {
+ private final TypeUtil.GetID getID;
+
+ AssignIds(TypeUtil.GetID getID) {
+ this.getID = getID;
+ }
+
+ private int idFor(int id) {
+ return getID.get(id);
+ }
+
+ @Override
+ public Type schema(Schema schema, Supplier<Type> future) {
+ return future.get();
+ }
+
+ @Override
+ public Type struct(Types.StructType struct, Iterable<Type> futures) {
+ List<Types.NestedField> fields = struct.fields();
+ int length = struct.fields().size();
+
+ // assign IDs for this struct's fields first
+ List<Integer> newIds = Lists.newArrayListWithExpectedSize(length);
+ for (Types.NestedField field : fields) {
+ newIds.add(idFor(field.fieldId()));
+ }
+
+ List<Types.NestedField> newFields =
Lists.newArrayListWithExpectedSize(length);
+ Iterator<Type> types = futures.iterator();
+ for (int i = 0; i < length; i += 1) {
+ Types.NestedField field = fields.get(i);
+ Type type = types.next();
+ if (field.isOptional()) {
+ newFields.add(Types.NestedField.optional(newIds.get(i), field.name(),
type, field.doc()));
+ } else {
+ newFields.add(Types.NestedField.required(newIds.get(i), field.name(),
type, field.doc()));
+ }
+ }
+
+ return Types.StructType.of(newFields);
+ }
+
+ @Override
+ public Type field(Types.NestedField field, Supplier<Type> future) {
+ return future.get();
+ }
+
+ @Override
+ public Type list(Types.ListType list, Supplier<Type> future) {
+ int newId = idFor(list.elementId());
+ if (list.isElementOptional()) {
+ return Types.ListType.ofOptional(newId, future.get());
+ } else {
+ return Types.ListType.ofRequired(newId, future.get());
+ }
+ }
+
+ @Override
+ public Type map(Types.MapType map, Supplier<Type> keyFuture, Supplier<Type>
valueFuture) {
+ int newKeyId = idFor(map.keyId());
+ int newValueId = idFor(map.valueId());
+ if (map.isValueOptional()) {
+ return Types.MapType.ofOptional(newKeyId, newValueId, keyFuture.get(),
valueFuture.get());
+ } else {
+ return Types.MapType.ofRequired(newKeyId, newValueId, keyFuture.get(),
valueFuture.get());
+ }
+ }
+
+ @Override
+ public Type primitive(Type.PrimitiveType primitive) {
+ return primitive;
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 7c13d60940..07d06dcc5a 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -355,6 +355,17 @@ public class TypeUtil {
return new Schema(struct.fields(), refreshIdentifierFields(struct,
schema));
}
+ /**
+ * Assigns fresh ids from the {@link GetID getId function} for all fields in
a type.
+ *
+ * @param type a type
+ * @param getId an id assignment function
+ * @return an structurally identical type with new ids assigned by the getId
function
+ */
+ public static Type assignIds(Type type, GetID getId) {
+ return TypeUtil.visit(type, new AssignIds(getId));
+ }
+
public static Type find(Schema schema, Predicate<Type> predicate) {
return visit(schema, new FindTypeVisitor(predicate));
}
@@ -521,6 +532,11 @@ public class TypeUtil {
int get();
}
+ /** Interface for passing a function that assigns column IDs from the
previous Id. */
+ public interface GetID {
+ int get(int oldId);
+ }
+
public static class SchemaVisitor<T> {
public void beforeField(Types.NestedField field) {}
diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java
b/api/src/main/java/org/apache/iceberg/types/Types.java
index dda842c9e1..ce6caa4721 100644
--- a/api/src/main/java/org/apache/iceberg/types/Types.java
+++ b/api/src/main/java/org/apache/iceberg/types/Types.java
@@ -475,6 +475,10 @@ public class Types {
return new NestedField(false, id, name, type, doc);
}
+ public NestedField withFieldId(int newId) {
+ return new NestedField(isOptional, newId, name, type, doc);
+ }
+
public int fieldId() {
return id;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 57a6386093..e1e138109f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -74,8 +74,11 @@ public abstract class BaseMetadataTable extends
BaseReadOnlyTable implements Ser
.withSpecId(spec.specId())
.checkConflicts(false);
+ Map<Integer, Integer> reassignedFields =
metadataTableSchema.idsToReassigned();
+
for (PartitionField field : spec.fields()) {
- builder.add(field.fieldId(), field.fieldId(), field.name(),
Transforms.identity());
+ int newFieldId = reassignedFields.getOrDefault(field.fieldId(),
field.fieldId());
+ builder.add(newFieldId, newFieldId, field.name(), Transforms.identity());
}
return builder.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 4ee51aa60c..b5f85813dd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -114,7 +114,7 @@ public class ManifestReader<F extends ContentFile<F>>
extends CloseableGroup
this.spec = readPartitionSpec(file);
}
- this.fileSchema = new
Schema(DataFile.getType(spec.partitionType()).fields());
+ this.fileSchema = new
Schema(DataFile.getType(spec.rawPartitionType()).fields());
}
private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile
inputFile) {
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 5627063580..382ad663a8 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expression;
@@ -33,6 +35,8 @@ import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
@@ -107,8 +111,8 @@ public class PositionDeletesTable extends BaseMetadataTable
{
private Schema calculateSchema() {
Types.StructType partitionType = Partitioning.partitionType(table());
- Schema result =
- new Schema(
+ List<Types.NestedField> columns =
+ ImmutableList.of(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.optional(
@@ -132,6 +136,35 @@ public class PositionDeletesTable extends
BaseMetadataTable {
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));
+ // Calculate used ids (for de-conflict)
+ Set<Integer> currentlyUsedIds =
+
Collections.unmodifiableSet(TypeUtil.indexById(Types.StructType.of(columns)).keySet());
+ Set<Integer> allUsedIds =
+ table().schemas().values().stream()
+ .map(currSchema ->
TypeUtil.indexById(currSchema.asStruct()).keySet())
+ .reduce(currentlyUsedIds, Sets::union);
+
+ // Calculate ids to reassign
+ Set<Integer> idsToReassign =
+
partitionType.fields().stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
+
+ // Reassign selected ids to de-conflict with used ids.
+ AtomicInteger nextId = new AtomicInteger();
+ Schema result =
+ new Schema(
+ columns,
+ ImmutableSet.of(),
+ oldId -> {
+ if (!idsToReassign.contains(oldId)) {
+ return oldId;
+ }
+ int candidate = nextId.incrementAndGet();
+ while (allUsedIds.contains(candidate)) {
+ candidate = nextId.incrementAndGet();
+ }
+ return candidate;
+ });
+
if (!partitionType.fields().isEmpty()) {
return result;
} else {
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 4c5f1d240f..df314f6a80 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;
import org.junit.jupiter.api.TestTemplate;
@@ -1266,7 +1267,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
ScanTask task = tasks.get(0);
assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
- Types.StructType partitionType = Partitioning.partitionType(table);
+ Types.StructType partitionType =
positionDeletesTable.spec().partitionType();
PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
@@ -1333,7 +1334,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
ScanTask task = tasks.get(0);
assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
- Types.StructType partitionType = Partitioning.partitionType(table);
+ Types.StructType partitionType =
positionDeletesTable.spec().partitionType();
PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
// base table filter should only be used to evaluate partitions
@@ -1415,7 +1416,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
ScanTask task = tasks.get(0);
assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
- Types.StructType partitionType = Partitioning.partitionType(table);
+ Types.StructType partitionType =
positionDeletesTable.spec().partitionType();
PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
// base table filter should only be used to evaluate partitions
@@ -1426,7 +1427,7 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
(StructLike)
constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
int taskPartition =
- taskPartitionStruct.get(1, Integer.class); // new partition field in
position 1
+ taskPartitionStruct.get(0, Integer.class); // new partition field in
position 0
assertThat(filePartition).as("Expected correct partition on task's
file").isEqualTo(1);
assertThat(taskPartition).as("Expected correct partition on task's
column").isEqualTo(1);
@@ -1564,4 +1565,66 @@ public class TestMetadataTableScans extends
MetadataTableScanTestBase {
assertThat(scanTask1Partition).isEqualTo(expected);
assertThat(scanTask2Partition).isEqualTo(expected);
}
+
+ @TestTemplate
+ public void testPositionDeletesManyColumns() {
+ assumeThat(formatVersion).as("Position deletes supported only for v2
tables").isEqualTo(2);
+
+ UpdateSchema updateSchema = table.updateSchema();
+ for (int i = 0; i <= 2000; i++) {
+ updateSchema.addColumn(String.valueOf(i), Types.IntegerType.get());
+ }
+ updateSchema.commit();
+
+ DataFile dataFile1 =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data1.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data2.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ DeleteFile delete1 =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .ofPositionDeletes()
+ .withPath("/path/to/delete1.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DeleteFile delete2 =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .ofPositionDeletes()
+ .withPath("/path/to/delete2.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit();
+
+ PositionDeletesTable positionDeletesTable = new
PositionDeletesTable(table);
+
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010);
+
+ BatchScan scan = positionDeletesTable.newBatchScan();
+
assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+ PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+ (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+ List<PositionDeletesScanTask> scanTasks =
+ Lists.newArrayList(
+ Iterators.transform(
+ deleteScan.planFiles().iterator(),
+ task -> {
+ assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+ return (PositionDeletesScanTask) task;
+ }));
+ assertThat(scanTasks).hasSize(2);
+ scanTasks.sort(Comparator.comparing(f -> f.file().path().toString()));
+
assertThat(scanTasks.get(0).file().path().toString()).isEqualTo("/path/to/delete1.parquet");
+
assertThat(scanTasks.get(1).file().path().toString()).isEqualTo("/path/to/delete2.parquet");
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index faccdcb3dd..a2e5386d29 100644
---
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -21,7 +21,6 @@ package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
@@ -186,7 +185,7 @@ public class TestMetadataTableScansWithPartitionEvolution
extends MetadataTableS
ScanTask task = tasks.get(0);
assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
- Types.StructType partitionType = Partitioning.partitionType(table);
+ Types.StructType partitionType =
positionDeletesTable.spec().partitionType();
PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
@@ -196,15 +195,13 @@ public class TestMetadataTableScansWithPartitionEvolution
extends MetadataTableS
int taskConstantPartition =
((StructLike)
constantsMap(posDeleteTask,
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID))
- .get(1, Integer.class);
+ .get(0, Integer.class);
assertThat(taskConstantPartition)
.as("Expected correct partition on constant column")
.isEqualTo(1);
-
assertThat(posDeleteTask.spec().fields().get(0).fieldId())
.as("Expected correct partition field id on task's spec")
-
.isEqualTo(table.ops().current().spec().partitionType().fields().get(0).fieldId());
-
+ .isEqualTo(partitionType.fields().get(0).fieldId());
assertThat(posDeleteTask.file().specId())
.as("Expected correct partition spec id on task")
.isEqualTo(table.ops().current().spec().specId());
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..ea1c529401 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
}
private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
- CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
+ CloseableIterable<PositionDeletesScanTask> fileTasks =
planFiles(deletesTable);
try {
- StructType partitionType = Partitioning.partitionType(table);
+ StructType partitionType = Partitioning.partitionType(deletesTable);
StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
groupByPartition(partitionType, fileTasks);
return fileGroupsByPartition(fileTasksByPartition);
@@ -153,10 +155,7 @@ public class RewritePositionDeleteFilesSparkAction
}
}
- private CloseableIterable<PositionDeletesScanTask> planFiles() {
- Table deletesTable =
- MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
-
+ private CloseableIterable<PositionDeletesScanTask> planFiles(Table
deletesTable) {
PositionDeletesBatchScan scan = (PositionDeletesBatchScan)
deletesTable.newBatchScan();
return CloseableIterable.transform(
scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 77800e2ea0..aa2817e875 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Arrays;
@@ -69,6 +71,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
@@ -581,6 +584,69 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteManyColumns() throws Exception {
+ List<Types.NestedField> fields =
+ Lists.newArrayList(Types.NestedField.optional(0, "id",
Types.LongType.get()));
+ List<Types.NestedField> additionalCols =
+ IntStream.range(1, 1010)
+ .mapToObj(i -> Types.NestedField.optional(i, "c" + i,
Types.StringType.get()))
+ .collect(Collectors.toList());
+ fields.addAll(additionalCols);
+ Schema schema = new Schema(fields);
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id",
2).build();
+ Table table =
+ validationCatalog.createTable(
+ TableIdentifier.of("default", TABLE_NAME), schema, spec,
tableProperties());
+
+ Dataset<Row> df =
+ spark
+ .range(4)
+ .withColumns(
+ IntStream.range(1, 1010)
+ .boxed()
+ .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id
as STRING)"))));
+ StructType sparkSchema = spark.table(name(table)).schema();
+ spark
+ .createDataFrame(df.rdd(), sparkSchema)
+ .coalesce(1)
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(name(table));
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 1, 1, dataFiles);
+ assertThat(dataFiles).hasSize(2);
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ List<Object[]> expectedRecords = records(table);
+ List<Object[]> expectedDeletes = deleteRecords(table);
+ assertThat(expectedRecords).hasSize(2);
+ assertThat(expectedDeletes).hasSize(2);
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
+ .execute();
+
+ List<DeleteFile> newDeleteFiles = deleteFiles(table);
+ assertThat(newDeleteFiles).hasSize(2);
+ assertNotContains(deleteFiles, newDeleteFiles);
+ assertLocallySorted(newDeleteFiles);
+ checkResult(result, deleteFiles, newDeleteFiles, 2);
+ checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+ List<Object[]> actualRecords = records(table);
+ List<Object[]> actualDeletes = deleteRecords(table);
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
+ }
+
private Table createTablePartitioned(int partitions, int files, int
numRecords) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..bdb0ee3527 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
}
private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
- CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
+ CloseableIterable<PositionDeletesScanTask> fileTasks =
planFiles(deletesTable);
try {
- StructType partitionType = Partitioning.partitionType(table);
+ StructType partitionType = Partitioning.partitionType(deletesTable);
StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
groupByPartition(partitionType, fileTasks);
return fileGroupsByPartition(fileTasksByPartition);
@@ -153,11 +155,9 @@ public class RewritePositionDeleteFilesSparkAction
}
}
- private CloseableIterable<PositionDeletesScanTask> planFiles() {
- Table deletesTable =
- MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
-
+ private CloseableIterable<PositionDeletesScanTask> planFiles(Table
deletesTable) {
PositionDeletesBatchScan scan = (PositionDeletesBatchScan)
deletesTable.newBatchScan();
+
return CloseableIterable.transform(
scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
task -> (PositionDeletesScanTask) task);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index db2d42501d..7be300e84f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
@@ -71,6 +72,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
@@ -618,6 +620,69 @@ public class TestRewritePositionDeleteFilesAction extends
SparkCatalogTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteManyColumns() throws Exception {
+ List<Types.NestedField> fields =
+ Lists.newArrayList(Types.NestedField.optional(0, "id",
Types.LongType.get()));
+ List<Types.NestedField> additionalCols =
+ IntStream.range(1, 1010)
+ .mapToObj(i -> Types.NestedField.optional(i, "c" + i,
Types.StringType.get()))
+ .collect(Collectors.toList());
+ fields.addAll(additionalCols);
+ Schema schema = new Schema(fields);
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id",
2).build();
+ Table table =
+ validationCatalog.createTable(
+ TableIdentifier.of("default", TABLE_NAME), schema, spec,
tableProperties());
+
+ Dataset<Row> df =
+ spark
+ .range(4)
+ .withColumns(
+ IntStream.range(1, 1010)
+ .boxed()
+ .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id
as STRING)"))));
+ StructType sparkSchema = spark.table(name(table)).schema();
+ spark
+ .createDataFrame(df.rdd(), sparkSchema)
+ .coalesce(1)
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(name(table));
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 1, 1, dataFiles);
+ assertThat(dataFiles).hasSize(2);
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ List<Object[]> expectedRecords = records(table);
+ List<Object[]> expectedDeletes = deleteRecords(table);
+ assertThat(expectedRecords).hasSize(2);
+ assertThat(expectedDeletes).hasSize(2);
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
+ .execute();
+
+ List<DeleteFile> newDeleteFiles = deleteFiles(table);
+ assertThat(newDeleteFiles).hasSize(2);
+ assertNotContains(deleteFiles, newDeleteFiles);
+ assertLocallySorted(newDeleteFiles);
+ checkResult(result, deleteFiles, newDeleteFiles, 2);
+ checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+ List<Object[]> actualRecords = records(table);
+ List<Object[]> actualDeletes = deleteRecords(table);
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
+ }
+
private Table createTablePartitioned(int partitions, int files, int
numRecords) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index 539f6de920..1166740f44 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
}
private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
- CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
+ CloseableIterable<PositionDeletesScanTask> fileTasks =
planFiles(deletesTable);
try {
- StructType partitionType = Partitioning.partitionType(table);
+ StructType partitionType = Partitioning.partitionType(deletesTable);
StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
groupByPartition(partitionType, fileTasks);
return fileGroupsByPartition(fileTasksByPartition);
@@ -153,10 +155,7 @@ public class RewritePositionDeleteFilesSparkAction
}
}
- private CloseableIterable<PositionDeletesScanTask> planFiles() {
- Table deletesTable =
- MetadataTableUtils.createMetadataTableInstance(table,
MetadataTableType.POSITION_DELETES);
-
+ private CloseableIterable<PositionDeletesScanTask> planFiles(Table
deletesTable) {
PositionDeletesBatchScan scan = (PositionDeletesBatchScan)
deletesTable.newBatchScan();
return CloseableIterable.transform(
scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 89c44dbfcc..37b6cd86fb 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -77,6 +78,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;
@@ -647,6 +649,69 @@ public class TestRewritePositionDeleteFilesAction extends
CatalogTestBase {
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
}
+ @TestTemplate
+ public void testRewriteManyColumns() throws Exception {
+ List<Types.NestedField> fields =
+ Lists.newArrayList(Types.NestedField.required(0, "id",
Types.LongType.get()));
+ List<Types.NestedField> additionalCols =
+ IntStream.range(1, 1010)
+ .mapToObj(i -> Types.NestedField.optional(i, "c" + i,
Types.StringType.get()))
+ .collect(Collectors.toList());
+ fields.addAll(additionalCols);
+ Schema schema = new Schema(fields);
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id",
2).build();
+ Table table =
+ validationCatalog.createTable(
+ TableIdentifier.of("default", TABLE_NAME), schema, spec,
tableProperties());
+
+ Dataset<Row> df =
+ spark
+ .range(4)
+ .withColumns(
+ IntStream.range(1, 1010)
+ .boxed()
+ .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id
as STRING)"))));
+ StructType sparkSchema = spark.table(name(table)).schema();
+ spark
+ .createDataFrame(df.rdd(), sparkSchema)
+ .coalesce(1)
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(name(table));
+
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ writePosDeletesForFiles(table, 1, 1, dataFiles);
+ assertThat(dataFiles).hasSize(2);
+
+ List<DeleteFile> deleteFiles = deleteFiles(table);
+ assertThat(deleteFiles).hasSize(2);
+
+ List<Object[]> expectedRecords = records(table);
+ List<Object[]> expectedDeletes = deleteRecords(table);
+ assertThat(expectedRecords).hasSize(2);
+ assertThat(expectedDeletes).hasSize(2);
+
+ Result result =
+ SparkActions.get(spark)
+ .rewritePositionDeletes(table)
+ .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+ .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE - 1))
+ .execute();
+
+ List<DeleteFile> newDeleteFiles = deleteFiles(table);
+ assertThat(newDeleteFiles).hasSize(2);
+ assertNotContains(deleteFiles, newDeleteFiles);
+ assertLocallySorted(newDeleteFiles);
+ checkResult(result, deleteFiles, newDeleteFiles, 2);
+ checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+ List<Object[]> actualRecords = records(table);
+ List<Object[]> actualDeletes = deleteRecords(table);
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertEquals("Position deletes must match", expectedDeletes,
actualDeletes);
+ }
+
private Table createTablePartitioned(int partitions, int files, int
numRecords) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =