This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c949cd86 Core, Spark: Calling rewrite_position_delete_files fails on 
tables with more than 1k columns (#10020)
b6c949cd86 is described below

commit b6c949cd86c372a87a4d43a557c19dd310af80d8
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Jun 12 16:50:17 2024 -0700

    Core, Spark: Calling rewrite_position_delete_files fails on tables with 
more than 1k columns (#10020)
---
 .palantir/revapi.yml                               |  6 ++
 .../java/org/apache/iceberg/PartitionSpec.java     | 26 ++++++
 api/src/main/java/org/apache/iceberg/Schema.java   | 70 ++++++++++++++-
 .../java/org/apache/iceberg/types/AssignIds.java   | 99 ++++++++++++++++++++++
 .../java/org/apache/iceberg/types/TypeUtil.java    | 16 ++++
 .../main/java/org/apache/iceberg/types/Types.java  |  4 +
 .../java/org/apache/iceberg/BaseMetadataTable.java |  5 +-
 .../java/org/apache/iceberg/ManifestReader.java    |  2 +-
 .../org/apache/iceberg/PositionDeletesTable.java   | 37 +++++++-
 .../org/apache/iceberg/TestMetadataTableScans.java | 71 +++++++++++++++-
 ...stMetadataTableScansWithPartitionEvolution.java |  9 +-
 .../RewritePositionDeleteFilesSparkAction.java     | 11 ++-
 .../TestRewritePositionDeleteFilesAction.java      | 66 +++++++++++++++
 .../RewritePositionDeleteFilesSparkAction.java     | 12 +--
 .../TestRewritePositionDeleteFilesAction.java      | 65 ++++++++++++++
 .../RewritePositionDeleteFilesSparkAction.java     | 11 ++-
 .../TestRewritePositionDeleteFilesAction.java      | 65 ++++++++++++++
 17 files changed, 541 insertions(+), 34 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index a41d3ddfb8..808a192990 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1018,6 +1018,12 @@ acceptedBreaks:
       old: "method void 
org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
         \ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
       justification: "Removing deprecated code"
+  "1.5.0":
+    org.apache.iceberg:iceberg-api:
+    - code: "java.class.defaultSerializationChanged"
+      old: "class org.apache.iceberg.types.Types.NestedField"
+      new: "class org.apache.iceberg.types.Types.NestedField"
+      justification: "new Constructor added"
   apache-iceberg-0.14.0:
     org.apache.iceberg:iceberg-api:
     - code: "java.class.defaultSerializationChanged"
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java 
b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 4fcb110db8..8f1df79403 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -60,6 +61,7 @@ public class PartitionSpec implements Serializable {
   private transient volatile ListMultimap<Integer, PartitionField> 
fieldsBySourceId = null;
   private transient volatile Class<?>[] lazyJavaClasses = null;
   private transient volatile StructType lazyPartitionType = null;
+  private transient volatile StructType lazyRawPartitionType = null;
   private transient volatile List<PartitionField> fieldList = null;
   private final int lastAssignedFieldId;
 
@@ -140,6 +142,30 @@ public class PartitionSpec implements Serializable {
     return lazyPartitionType;
   }
 
+  /**
+   * Returns a struct matching partition information as written into manifest 
files. See {@link
+   * #partitionType()} for a struct with field ID's potentially re-assigned to 
avoid conflict.
+   */
+  public StructType rawPartitionType() {
+    if (schema.idsToOriginal().isEmpty()) {
+      // not re-assigned.
+      return partitionType();
+    }
+    if (lazyRawPartitionType == null) {
+      synchronized (this) {
+        if (lazyRawPartitionType == null) {
+          this.lazyRawPartitionType =
+              StructType.of(
+                  partitionType().fields().stream()
+                      .map(f -> 
f.withFieldId(schema.idsToOriginal().get(f.fieldId())))
+                      .collect(Collectors.toList()));
+        }
+      }
+    }
+
+    return lazyRawPartitionType;
+  }
+
   public Class<?>[] javaClasses() {
     if (lazyJavaClasses == null) {
       synchronized (this) {
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java 
b/api/src/main/java/org/apache/iceberg/Schema.java
index 5e024b7c1c..d5ec3f2509 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Locale;
@@ -34,6 +35,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
 import org.apache.iceberg.types.Type;
@@ -65,6 +67,8 @@ public class Schema implements Serializable {
   private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
   private transient Map<Integer, String> idToName = null;
   private transient Set<Integer> identifierFieldIdSet = null;
+  private transient Map<Integer, Integer> idsToReassigned;
+  private transient Map<Integer, Integer> idsToOriginal;
 
   public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
     this(columns, aliases, ImmutableSet.of());
@@ -83,12 +87,24 @@ public class Schema implements Serializable {
     this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
   }
 
+  public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds, 
TypeUtil.GetID getId) {
+    this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds, getId);
+  }
+
   public Schema(int schemaId, List<NestedField> columns) {
     this(schemaId, columns, ImmutableSet.of());
   }
 
   public Schema(int schemaId, List<NestedField> columns, Set<Integer> 
identifierFieldIds) {
-    this(schemaId, columns, null, identifierFieldIds);
+    this(schemaId, columns, null, identifierFieldIds, null);
+  }
+
+  public Schema(
+      int schemaId,
+      List<NestedField> columns,
+      Set<Integer> identifierFieldIds,
+      TypeUtil.GetID getId) {
+    this(schemaId, columns, null, identifierFieldIds, getId);
   }
 
   public Schema(
@@ -96,8 +112,22 @@ public class Schema implements Serializable {
       List<NestedField> columns,
       Map<String, Integer> aliases,
       Set<Integer> identifierFieldIds) {
+    this(schemaId, columns, aliases, identifierFieldIds, null);
+  }
+
+  public Schema(
+      int schemaId,
+      List<NestedField> columns,
+      Map<String, Integer> aliases,
+      Set<Integer> identifierFieldIds,
+      TypeUtil.GetID getID) {
     this.schemaId = schemaId;
-    this.struct = StructType.of(columns);
+
+    this.idsToOriginal = Maps.newHashMap();
+    this.idsToReassigned = Maps.newHashMap();
+    List<NestedField> finalColumns = reassignIds(columns, getID);
+
+    this.struct = StructType.of(finalColumns);
     this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;
 
     // validate IdentifierField
@@ -507,4 +537,40 @@ public class Schema implements Serializable {
                 .map(this::identifierFieldToString)
                 .collect(Collectors.toList())));
   }
+
+  /**
+   * The ID's of some fields will be re-assigned if GetID is specified for the 
Schema.
+   *
+   * @return map of original to reassigned field ids
+   */
+  public Map<Integer, Integer> idsToReassigned() {
+    return idsToReassigned != null ? idsToReassigned : Collections.emptyMap();
+  }
+
+  /**
+   * The ID's of some fields will be re-assigned if GetID is specified for the 
Schema.
+   *
+   * @return map of reassigned to original field ids
+   */
+  public Map<Integer, Integer> idsToOriginal() {
+    return idsToOriginal != null ? idsToOriginal : Collections.emptyMap();
+  }
+
+  private List<NestedField> reassignIds(List<NestedField> columns, 
TypeUtil.GetID getID) {
+    if (getID == null) {
+      return columns;
+    }
+    Type res =
+        TypeUtil.assignIds(
+            StructType.of(columns),
+            oldId -> {
+              int newId = getID.get(oldId);
+              if (newId != oldId) {
+                idsToReassigned.put(oldId, newId);
+                idsToOriginal.put(newId, oldId);
+              }
+              return newId;
+            });
+    return res.asStructType().fields();
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/types/AssignIds.java 
b/api/src/main/java/org/apache/iceberg/types/AssignIds.java
new file mode 100644
index 0000000000..68588f581a
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/types/AssignIds.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.types;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class AssignIds extends TypeUtil.CustomOrderSchemaVisitor<Type> {
+  private final TypeUtil.GetID getID;
+
+  AssignIds(TypeUtil.GetID getID) {
+    this.getID = getID;
+  }
+
+  private int idFor(int id) {
+    return getID.get(id);
+  }
+
+  @Override
+  public Type schema(Schema schema, Supplier<Type> future) {
+    return future.get();
+  }
+
+  @Override
+  public Type struct(Types.StructType struct, Iterable<Type> futures) {
+    List<Types.NestedField> fields = struct.fields();
+    int length = struct.fields().size();
+
+    // assign IDs for this struct's fields first
+    List<Integer> newIds = Lists.newArrayListWithExpectedSize(length);
+    for (Types.NestedField field : fields) {
+      newIds.add(idFor(field.fieldId()));
+    }
+
+    List<Types.NestedField> newFields = 
Lists.newArrayListWithExpectedSize(length);
+    Iterator<Type> types = futures.iterator();
+    for (int i = 0; i < length; i += 1) {
+      Types.NestedField field = fields.get(i);
+      Type type = types.next();
+      if (field.isOptional()) {
+        newFields.add(Types.NestedField.optional(newIds.get(i), field.name(), 
type, field.doc()));
+      } else {
+        newFields.add(Types.NestedField.required(newIds.get(i), field.name(), 
type, field.doc()));
+      }
+    }
+
+    return Types.StructType.of(newFields);
+  }
+
+  @Override
+  public Type field(Types.NestedField field, Supplier<Type> future) {
+    return future.get();
+  }
+
+  @Override
+  public Type list(Types.ListType list, Supplier<Type> future) {
+    int newId = idFor(list.elementId());
+    if (list.isElementOptional()) {
+      return Types.ListType.ofOptional(newId, future.get());
+    } else {
+      return Types.ListType.ofRequired(newId, future.get());
+    }
+  }
+
+  @Override
+  public Type map(Types.MapType map, Supplier<Type> keyFuture, Supplier<Type> 
valueFuture) {
+    int newKeyId = idFor(map.keyId());
+    int newValueId = idFor(map.valueId());
+    if (map.isValueOptional()) {
+      return Types.MapType.ofOptional(newKeyId, newValueId, keyFuture.get(), 
valueFuture.get());
+    } else {
+      return Types.MapType.ofRequired(newKeyId, newValueId, keyFuture.get(), 
valueFuture.get());
+    }
+  }
+
+  @Override
+  public Type primitive(Type.PrimitiveType primitive) {
+    return primitive;
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java 
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 7c13d60940..07d06dcc5a 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -355,6 +355,17 @@ public class TypeUtil {
     return new Schema(struct.fields(), refreshIdentifierFields(struct, 
schema));
   }
 
+  /**
+   * Assigns fresh ids from the {@link GetID getId function} for all fields in 
a type.
+   *
+   * @param type a type
+   * @param getId an id assignment function
+   * @return an structurally identical type with new ids assigned by the getId 
function
+   */
+  public static Type assignIds(Type type, GetID getId) {
+    return TypeUtil.visit(type, new AssignIds(getId));
+  }
+
   public static Type find(Schema schema, Predicate<Type> predicate) {
     return visit(schema, new FindTypeVisitor(predicate));
   }
@@ -521,6 +532,11 @@ public class TypeUtil {
     int get();
   }
 
+  /** Interface for passing a function that assigns column IDs from the 
previous Id. */
+  public interface GetID {
+    int get(int oldId);
+  }
+
   public static class SchemaVisitor<T> {
     public void beforeField(Types.NestedField field) {}
 
diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java 
b/api/src/main/java/org/apache/iceberg/types/Types.java
index dda842c9e1..ce6caa4721 100644
--- a/api/src/main/java/org/apache/iceberg/types/Types.java
+++ b/api/src/main/java/org/apache/iceberg/types/Types.java
@@ -475,6 +475,10 @@ public class Types {
       return new NestedField(false, id, name, type, doc);
     }
 
+    public NestedField withFieldId(int newId) {
+      return new NestedField(isOptional, newId, name, type, doc);
+    }
+
     public int fieldId() {
       return id;
     }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java 
b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 57a6386093..e1e138109f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -74,8 +74,11 @@ public abstract class BaseMetadataTable extends 
BaseReadOnlyTable implements Ser
             .withSpecId(spec.specId())
             .checkConflicts(false);
 
+    Map<Integer, Integer> reassignedFields = 
metadataTableSchema.idsToReassigned();
+
     for (PartitionField field : spec.fields()) {
-      builder.add(field.fieldId(), field.fieldId(), field.name(), 
Transforms.identity());
+      int newFieldId = reassignedFields.getOrDefault(field.fieldId(), 
field.fieldId());
+      builder.add(newFieldId, newFieldId, field.name(), Transforms.identity());
     }
     return builder.build();
   }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 4ee51aa60c..b5f85813dd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -114,7 +114,7 @@ public class ManifestReader<F extends ContentFile<F>> 
extends CloseableGroup
       this.spec = readPartitionSpec(file);
     }
 
-    this.fileSchema = new 
Schema(DataFile.getType(spec.partitionType()).fields());
+    this.fileSchema = new 
Schema(DataFile.getType(spec.rawPartitionType()).fields());
   }
 
   private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile 
inputFile) {
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java 
b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 5627063580..382ad663a8 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.iceberg.expressions.Expression;
@@ -33,6 +35,8 @@ import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
@@ -107,8 +111,8 @@ public class PositionDeletesTable extends BaseMetadataTable 
{
 
   private Schema calculateSchema() {
     Types.StructType partitionType = Partitioning.partitionType(table());
-    Schema result =
-        new Schema(
+    List<Types.NestedField> columns =
+        ImmutableList.of(
             MetadataColumns.DELETE_FILE_PATH,
             MetadataColumns.DELETE_FILE_POS,
             Types.NestedField.optional(
@@ -132,6 +136,35 @@ public class PositionDeletesTable extends 
BaseMetadataTable {
                 Types.StringType.get(),
                 MetadataColumns.FILE_PATH_COLUMN_DOC));
 
+    // Calculate used ids (for de-conflict)
+    Set<Integer> currentlyUsedIds =
+        
Collections.unmodifiableSet(TypeUtil.indexById(Types.StructType.of(columns)).keySet());
+    Set<Integer> allUsedIds =
+        table().schemas().values().stream()
+            .map(currSchema -> 
TypeUtil.indexById(currSchema.asStruct()).keySet())
+            .reduce(currentlyUsedIds, Sets::union);
+
+    // Calculate ids to reassign
+    Set<Integer> idsToReassign =
+        
partitionType.fields().stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
+
+    // Reassign selected ids to de-conflict with used ids.
+    AtomicInteger nextId = new AtomicInteger();
+    Schema result =
+        new Schema(
+            columns,
+            ImmutableSet.of(),
+            oldId -> {
+              if (!idsToReassign.contains(oldId)) {
+                return oldId;
+              }
+              int candidate = nextId.incrementAndGet();
+              while (allUsedIds.contains(candidate)) {
+                candidate = nextId.incrementAndGet();
+              }
+              return candidate;
+            });
+
     if (!partitionType.fields().isEmpty()) {
       return result;
     } else {
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 4c5f1d240f..df314f6a80 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeWrapper;
 import org.junit.jupiter.api.TestTemplate;
@@ -1266,7 +1267,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     ScanTask task = tasks.get(0);
     assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
 
-    Types.StructType partitionType = Partitioning.partitionType(table);
+    Types.StructType partitionType = 
positionDeletesTable.spec().partitionType();
     PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
 
     int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
@@ -1333,7 +1334,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     ScanTask task = tasks.get(0);
     assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
 
-    Types.StructType partitionType = Partitioning.partitionType(table);
+    Types.StructType partitionType = 
positionDeletesTable.spec().partitionType();
     PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
 
     // base table filter should only be used to evaluate partitions
@@ -1415,7 +1416,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     ScanTask task = tasks.get(0);
     assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
 
-    Types.StructType partitionType = Partitioning.partitionType(table);
+    Types.StructType partitionType = 
positionDeletesTable.spec().partitionType();
     PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
 
     // base table filter should only be used to evaluate partitions
@@ -1426,7 +1427,7 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
         (StructLike)
             constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID);
     int taskPartition =
-        taskPartitionStruct.get(1, Integer.class); // new partition field in 
position 1
+        taskPartitionStruct.get(0, Integer.class); // new partition field in 
position 0
     assertThat(filePartition).as("Expected correct partition on task's 
file").isEqualTo(1);
     assertThat(taskPartition).as("Expected correct partition on task's 
column").isEqualTo(1);
 
@@ -1564,4 +1565,66 @@ public class TestMetadataTableScans extends 
MetadataTableScanTestBase {
     assertThat(scanTask1Partition).isEqualTo(expected);
     assertThat(scanTask2Partition).isEqualTo(expected);
   }
+
+  @TestTemplate
+  public void testPositionDeletesManyColumns() {
+    assumeThat(formatVersion).as("Position deletes supported only for v2 
tables").isEqualTo(2);
+
+    UpdateSchema updateSchema = table.updateSchema();
+    for (int i = 0; i <= 2000; i++) {
+      updateSchema.addColumn(String.valueOf(i), Types.IntegerType.get());
+    }
+    updateSchema.commit();
+
+    DataFile dataFile1 =
+        DataFiles.builder(table.spec())
+            .withPath("/path/to/data1.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DataFile dataFile2 =
+        DataFiles.builder(table.spec())
+            .withPath("/path/to/data2.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    DeleteFile delete1 =
+        FileMetadata.deleteFileBuilder(table.spec())
+            .ofPositionDeletes()
+            .withPath("/path/to/delete1.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DeleteFile delete2 =
+        FileMetadata.deleteFileBuilder(table.spec())
+            .ofPositionDeletes()
+            .withPath("/path/to/delete2.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit();
+
+    PositionDeletesTable positionDeletesTable = new 
PositionDeletesTable(table);
+    
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010);
+
+    BatchScan scan = positionDeletesTable.newBatchScan();
+    
assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
+    PositionDeletesTable.PositionDeletesBatchScan deleteScan =
+        (PositionDeletesTable.PositionDeletesBatchScan) scan;
+
+    List<PositionDeletesScanTask> scanTasks =
+        Lists.newArrayList(
+            Iterators.transform(
+                deleteScan.planFiles().iterator(),
+                task -> {
+                  assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
+                  return (PositionDeletesScanTask) task;
+                }));
+    assertThat(scanTasks).hasSize(2);
+    scanTasks.sort(Comparator.comparing(f -> f.file().path().toString()));
+    
assertThat(scanTasks.get(0).file().path().toString()).isEqualTo("/path/to/delete1.parquet");
+    
assertThat(scanTasks.get(1).file().path().toString()).isEqualTo("/path/to/delete2.parquet");
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
index faccdcb3dd..a2e5386d29 100644
--- 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
+++ 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -21,7 +21,6 @@ package org.apache.iceberg;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.entry;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
@@ -186,7 +185,7 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
     ScanTask task = tasks.get(0);
     assertThat(task).isInstanceOf(PositionDeletesScanTask.class);
 
-    Types.StructType partitionType = Partitioning.partitionType(table);
+    Types.StructType partitionType = 
positionDeletesTable.spec().partitionType();
     PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task;
 
     int filePartition = posDeleteTask.file().partition().get(0, Integer.class);
@@ -196,15 +195,13 @@ public class TestMetadataTableScansWithPartitionEvolution 
extends MetadataTableS
     int taskConstantPartition =
         ((StructLike)
                 constantsMap(posDeleteTask, 
partitionType).get(MetadataColumns.PARTITION_COLUMN_ID))
-            .get(1, Integer.class);
+            .get(0, Integer.class);
     assertThat(taskConstantPartition)
         .as("Expected correct partition on constant column")
         .isEqualTo(1);
-
     assertThat(posDeleteTask.spec().fields().get(0).fieldId())
         .as("Expected correct partition field id on task's spec")
-        
.isEqualTo(table.ops().current().spec().partitionType().fields().get(0).fieldId());
-
+        .isEqualTo(partitionType.fields().get(0).fieldId());
     assertThat(posDeleteTask.file().specId())
         .as("Expected correct partition spec id on task")
         .isEqualTo(table.ops().current().spec().specId());
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..ea1c529401 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
   }
 
   private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
-    CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<PositionDeletesScanTask> fileTasks = 
planFiles(deletesTable);
 
     try {
-      StructType partitionType = Partitioning.partitionType(table);
+      StructType partitionType = Partitioning.partitionType(deletesTable);
       StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
           groupByPartition(partitionType, fileTasks);
       return fileGroupsByPartition(fileTasksByPartition);
@@ -153,10 +155,7 @@ public class RewritePositionDeleteFilesSparkAction
     }
   }
 
-  private CloseableIterable<PositionDeletesScanTask> planFiles() {
-    Table deletesTable =
-        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
-
+  private CloseableIterable<PositionDeletesScanTask> planFiles(Table 
deletesTable) {
     PositionDeletesBatchScan scan = (PositionDeletesBatchScan) 
deletesTable.newBatchScan();
     return CloseableIterable.transform(
         scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 77800e2ea0..aa2817e875 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.spark.actions;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -69,6 +71,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -581,6 +584,69 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManyColumns() throws Exception {
+    List<Types.NestedField> fields =
+        Lists.newArrayList(Types.NestedField.optional(0, "id", 
Types.LongType.get()));
+    List<Types.NestedField> additionalCols =
+        IntStream.range(1, 1010)
+            .mapToObj(i -> Types.NestedField.optional(i, "c" + i, 
Types.StringType.get()))
+            .collect(Collectors.toList());
+    fields.addAll(additionalCols);
+    Schema schema = new Schema(fields);
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 
2).build();
+    Table table =
+        validationCatalog.createTable(
+            TableIdentifier.of("default", TABLE_NAME), schema, spec, 
tableProperties());
+
+    Dataset<Row> df =
+        spark
+            .range(4)
+            .withColumns(
+                IntStream.range(1, 1010)
+                    .boxed()
+                    .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id 
as STRING)"))));
+    StructType sparkSchema = spark.table(name(table)).schema();
+    spark
+        .createDataFrame(df.rdd(), sparkSchema)
+        .coalesce(1)
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(name(table));
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 1, 1, dataFiles);
+    assertThat(dataFiles).hasSize(2);
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    assertThat(deleteFiles).hasSize(2);
+
+    List<Object[]> expectedRecords = records(table);
+    List<Object[]> expectedDeletes = deleteRecords(table);
+    assertThat(expectedRecords).hasSize(2);
+    assertThat(expectedDeletes).hasSize(2);
+
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
+            .execute();
+
+    List<DeleteFile> newDeleteFiles = deleteFiles(table);
+    assertThat(newDeleteFiles).hasSize(2);
+    assertNotContains(deleteFiles, newDeleteFiles);
+    assertLocallySorted(newDeleteFiles);
+    checkResult(result, deleteFiles, newDeleteFiles, 2);
+    checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+    List<Object[]> actualRecords = records(table);
+    List<Object[]> actualDeletes = deleteRecords(table);
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
+  }
+
   private Table createTablePartitioned(int partitions, int files, int 
numRecords) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
     Table table =
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index f3dfd2dcc3..bdb0ee3527 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
   }
 
   private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
-    CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<PositionDeletesScanTask> fileTasks = 
planFiles(deletesTable);
 
     try {
-      StructType partitionType = Partitioning.partitionType(table);
+      StructType partitionType = Partitioning.partitionType(deletesTable);
       StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
           groupByPartition(partitionType, fileTasks);
       return fileGroupsByPartition(fileTasksByPartition);
@@ -153,11 +155,9 @@ public class RewritePositionDeleteFilesSparkAction
     }
   }
 
-  private CloseableIterable<PositionDeletesScanTask> planFiles() {
-    Table deletesTable =
-        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
-
+  private CloseableIterable<PositionDeletesScanTask> planFiles(Table 
deletesTable) {
     PositionDeletesBatchScan scan = (PositionDeletesBatchScan) 
deletesTable.newBatchScan();
+
     return CloseableIterable.transform(
         scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
         task -> (PositionDeletesScanTask) task);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index db2d42501d..7be300e84f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.actions;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
@@ -71,6 +72,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -618,6 +620,69 @@ public class TestRewritePositionDeleteFilesAction extends 
SparkCatalogTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteManyColumns() throws Exception {
+    List<Types.NestedField> fields =
+        Lists.newArrayList(Types.NestedField.optional(0, "id", 
Types.LongType.get()));
+    List<Types.NestedField> additionalCols =
+        IntStream.range(1, 1010)
+            .mapToObj(i -> Types.NestedField.optional(i, "c" + i, 
Types.StringType.get()))
+            .collect(Collectors.toList());
+    fields.addAll(additionalCols);
+    Schema schema = new Schema(fields);
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 
2).build();
+    Table table =
+        validationCatalog.createTable(
+            TableIdentifier.of("default", TABLE_NAME), schema, spec, 
tableProperties());
+
+    Dataset<Row> df =
+        spark
+            .range(4)
+            .withColumns(
+                IntStream.range(1, 1010)
+                    .boxed()
+                    .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id 
as STRING)"))));
+    StructType sparkSchema = spark.table(name(table)).schema();
+    spark
+        .createDataFrame(df.rdd(), sparkSchema)
+        .coalesce(1)
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(name(table));
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 1, 1, dataFiles);
+    assertThat(dataFiles).hasSize(2);
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    assertThat(deleteFiles).hasSize(2);
+
+    List<Object[]> expectedRecords = records(table);
+    List<Object[]> expectedDeletes = deleteRecords(table);
+    assertThat(expectedRecords).hasSize(2);
+    assertThat(expectedDeletes).hasSize(2);
+
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
+            .execute();
+
+    List<DeleteFile> newDeleteFiles = deleteFiles(table);
+    assertThat(newDeleteFiles).hasSize(2);
+    assertNotContains(deleteFiles, newDeleteFiles);
+    assertLocallySorted(newDeleteFiles);
+    checkResult(result, deleteFiles, newDeleteFiles, 2);
+    checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+    List<Object[]> actualRecords = records(table);
+    List<Object[]> actualDeletes = deleteRecords(table);
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
+  }
+
   private Table createTablePartitioned(int partitions, int files, int 
numRecords) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
     Table table =
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
index 539f6de920..1166740f44 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
@@ -137,10 +137,12 @@ public class RewritePositionDeleteFilesSparkAction
   }
 
   private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
-    CloseableIterable<PositionDeletesScanTask> fileTasks = planFiles();
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<PositionDeletesScanTask> fileTasks = 
planFiles(deletesTable);
 
     try {
-      StructType partitionType = Partitioning.partitionType(table);
+      StructType partitionType = Partitioning.partitionType(deletesTable);
       StructLikeMap<List<PositionDeletesScanTask>> fileTasksByPartition =
           groupByPartition(partitionType, fileTasks);
       return fileGroupsByPartition(fileTasksByPartition);
@@ -153,10 +155,7 @@ public class RewritePositionDeleteFilesSparkAction
     }
   }
 
-  private CloseableIterable<PositionDeletesScanTask> planFiles() {
-    Table deletesTable =
-        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
-
+  private CloseableIterable<PositionDeletesScanTask> planFiles(Table 
deletesTable) {
     PositionDeletesBatchScan scan = (PositionDeletesBatchScan) 
deletesTable.newBatchScan();
     return CloseableIterable.transform(
         scan.baseTableFilter(filter).ignoreResiduals().planFiles(),
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 89c44dbfcc..37b6cd86fb 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.actions;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.spark.sql.functions.expr;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -77,6 +78,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.io.TempDir;
@@ -647,6 +649,69 @@ public class TestRewritePositionDeleteFilesAction extends 
CatalogTestBase {
     
assertThat(table.currentSnapshot().summary()).containsKeys(commitMetricsKeys);
   }
 
+  @TestTemplate
+  public void testRewriteManyColumns() throws Exception {
+    List<Types.NestedField> fields =
+        Lists.newArrayList(Types.NestedField.required(0, "id", 
Types.LongType.get()));
+    List<Types.NestedField> additionalCols =
+        IntStream.range(1, 1010)
+            .mapToObj(i -> Types.NestedField.optional(i, "c" + i, 
Types.StringType.get()))
+            .collect(Collectors.toList());
+    fields.addAll(additionalCols);
+    Schema schema = new Schema(fields);
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("id", 
2).build();
+    Table table =
+        validationCatalog.createTable(
+            TableIdentifier.of("default", TABLE_NAME), schema, spec, 
tableProperties());
+
+    Dataset<Row> df =
+        spark
+            .range(4)
+            .withColumns(
+                IntStream.range(1, 1010)
+                    .boxed()
+                    .collect(Collectors.toMap(i -> "c" + i, i -> expr("CAST(id 
as STRING)"))));
+    StructType sparkSchema = spark.table(name(table)).schema();
+    spark
+        .createDataFrame(df.rdd(), sparkSchema)
+        .coalesce(1)
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(name(table));
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    writePosDeletesForFiles(table, 1, 1, dataFiles);
+    assertThat(dataFiles).hasSize(2);
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    assertThat(deleteFiles).hasSize(2);
+
+    List<Object[]> expectedRecords = records(table);
+    List<Object[]> expectedDeletes = deleteRecords(table);
+    assertThat(expectedRecords).hasSize(2);
+    assertThat(expectedDeletes).hasSize(2);
+
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE - 1))
+            .execute();
+
+    List<DeleteFile> newDeleteFiles = deleteFiles(table);
+    assertThat(newDeleteFiles).hasSize(2);
+    assertNotContains(deleteFiles, newDeleteFiles);
+    assertLocallySorted(newDeleteFiles);
+    checkResult(result, deleteFiles, newDeleteFiles, 2);
+    checkSequenceNumbers(table, deleteFiles, newDeleteFiles);
+
+    List<Object[]> actualRecords = records(table);
+    List<Object[]> actualDeletes = deleteRecords(table);
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertEquals("Position deletes must match", expectedDeletes, 
actualDeletes);
+  }
+
   private Table createTablePartitioned(int partitions, int files, int 
numRecords) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
     Table table =


Reply via email to