aokolnychyi commented on code in PR #7389:
URL: https://github.com/apache/iceberg/pull/7389#discussion_r1185131471


##########
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java:
##########
@@ -41,11 +93,80 @@
   RewritePositionDeleteFiles filter(Expression expression);
 
   /** The action result that contains a summary of the execution. */
+  @Value.Immutable
   interface Result {
-    /** Returns the count of the position deletes that been rewritten. */
+    List<FileGroupRewriteResult> rewriteResults();
+
+    /** Returns the count of the position delete files that been rewritten. */

Review Comment:
   typo: `been rewritten` -> `have been rewritten`?



##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo;
+import 
org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Container class representing a set of position delete files to be rewritten 
by a {@link
+ * RewritePositionDeleteFiles} and the new files which have been written by 
the action.
+ */
+public class RewritePositionDeleteGroup {
+  private final FileGroupInfo info;
+  private final List<PositionDeletesScanTask> tasks;
+
+  private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
+
+  public RewritePositionDeleteGroup(
+      FileGroupInfo info, List<PositionDeletesScanTask> fileScanTasks) {

Review Comment:
   minor: `fileScanTasks` -> `tasks` to stay on one line?



##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Functionality used by {@link RewritePositionDeleteFiles} from different 
platforms to handle
+ * commits.
+ */
+public class RewritePositionDeletesCommitManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesCommitManager.class);
+
+  private final Table table;
+  private final long startingSnapshotId;
+
+  public RewritePositionDeletesCommitManager(Table table) {
+    this.table = table;
+    this.startingSnapshotId = table.currentSnapshot().snapshotId();
+  }
+
+  /**
+   * Perform a commit operation on the table adding and removing files as 
required for this set of
+   * file groups

Review Comment:
   minor: Missing `.` at the end?



##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Functionality used by {@link RewritePositionDeleteFiles} from different 
platforms to handle
+ * commits.
+ */
+public class RewritePositionDeletesCommitManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesCommitManager.class);
+
+  private final Table table;
+  private final long startingSnapshotId;
+
+  public RewritePositionDeletesCommitManager(Table table) {
+    this.table = table;
+    this.startingSnapshotId = table.currentSnapshot().snapshotId();
+  }
+
+  /**
+   * Perform a commit operation on the table adding and removing files as 
required for this set of
+   * file groups
+   *
+   * @param fileGroups file sets to commit
+   */
+  public void commitFileGroups(Set<RewritePositionDeleteGroup> fileGroups) {

Review Comment:
   optional: I'd either call these methods `commit` and `abort` or rename 
`commitOrClean` to include `FileGroup` as the naming is inconsistent. Up to 
you, I know we have this naming for rewriting data files.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java:
##########
@@ -69,14 +69,4 @@ private DistributionMode distributionMode(List<FileScanTask> 
group) {
     boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
     return requiresRepartition ? DistributionMode.RANGE : 
DistributionMode.NONE;
   }
-
-  /**
-   * Returns the smallest of our max write file threshold and our estimated 
split size based on the
-   * number of output files we want to generate. Add an overhead onto the 
estimated split size to
-   * try to avoid small errors in size creating brand-new files.
-   */
-  private long splitSize(long inputSize) {

Review Comment:
   Shall we drop `SPLIT_OVERHEAD` in this class as well?



##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Functionality used by {@link RewritePositionDeleteFiles} from different 
platforms to handle
+ * commits.
+ */
+public class RewritePositionDeletesCommitManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesCommitManager.class);
+
+  private final Table table;
+  private final long startingSnapshotId;
+
+  public RewritePositionDeletesCommitManager(Table table) {
+    this.table = table;
+    this.startingSnapshotId = table.currentSnapshot().snapshotId();
+  }
+
+  /**
+   * Perform a commit operation on the table adding and removing files as 
required for this set of
+   * file groups
+   *
+   * @param fileGroups file sets to commit
+   */
+  public void commitFileGroups(Set<RewritePositionDeleteGroup> fileGroups) {
+    Set<DeleteFile> rewrittenDeleteFiles = Sets.newHashSet();
+    Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
+    for (RewritePositionDeleteGroup group : fileGroups) {
+      rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles());
+      addedDeleteFiles.addAll(group.addedDeleteFiles());
+    }
+
+    table
+        .newRewrite()
+        .validateFromSnapshot(startingSnapshotId)
+        .rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, 
ImmutableSet.of(), addedDeleteFiles)
+        .commit();
+  }
+
+  /**
+   * Clean up a specified file set by removing any files created for that 
operation, should not
+   * throw any exceptions.
+   *
+   * @param fileGroup group of files which has already been rewritten
+   */
+  public void abortFileGroup(RewritePositionDeleteGroup fileGroup) {
+    Preconditions.checkState(
+        fileGroup.addedDeleteFiles() != null, "Cannot abort a fileGroup that 
was not rewritten");
+
+    Set<String> filePaths =

Review Comment:
   minor: We could use `Iterables.transform()` to avoid an eager set 
materialization.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteGroup;
+import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
+import 
org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark implementation of {@link RewritePositionDeleteFiles}. */
+public class RewritePositionDeletesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewritePositionDeletesSparkAction>
+    implements RewritePositionDeleteFiles {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesSparkAction.class);
+  private static final Set<String> VALID_OPTIONS =
+      ImmutableSet.of(
+          MAX_CONCURRENT_FILE_GROUP_REWRITES,
+          PARTIAL_PROGRESS_ENABLED,
+          PARTIAL_PROGRESS_MAX_COMMITS,
+          REWRITE_JOB_ORDER);
+  private static final Result EMPTY = 
ImmutableRewritePositionDeleteFiles.Result.builder().build();
+
+  private final Table table;
+  private final SparkBinPackPositionDeletesRewriter rewriter;
+
+  private int maxConcurrentFileGroupRewrites;
+  private int maxCommits;
+  private boolean partialProgressEnabled;
+  private RewriteJobOrder rewriteJobOrder;
+
+  RewritePositionDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
+  }
+
+  @Override
+  protected RewritePositionDeletesSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public RewritePositionDeletesSparkAction filter(Expression expression) {
+    throw new UnsupportedOperationException("Regular filters not supported 
yet.");
+  }
+
+  @Override
+  public RewritePositionDeleteFiles.Result execute() {
+    if (table.currentSnapshot() == null) {
+      LOG.info("Nothing found to rewrite in empty table {}", table.name());
+      return EMPTY;
+    }
+
+    validateAndInitOptions();
+
+    Map<StructLike, List<List<PositionDeletesScanTask>>> fileGroupsByPartition 
= planFileGroups();
+    RewriteExecutionContext ctx = new 
RewriteExecutionContext(fileGroupsByPartition);
+
+    if (ctx.totalGroupCount() == 0) {
+      LOG.info("Nothing found to rewrite in {}", table.name());
+      return EMPTY;
+    }
+
+    Stream<RewritePositionDeleteGroup> groupStream = toGroupStream(ctx, 
fileGroupsByPartition);
+
+    RewritePositionDeletesCommitManager commitManager = commitManager();
+    if (partialProgressEnabled) {
+      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+    } else {
+      return doExecute(ctx, groupStream, commitManager);
+    }
+  }
+
+  private Map<StructLike, List<List<PositionDeletesScanTask>>> 
planFileGroups() {
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<PositionDeletesScanTask> scanTasks =
+        CloseableIterable.transform(
+            deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+            t -> (PositionDeletesScanTask) t);
+
+    try {
+      StructType partitionType = Partitioning.partitionType(table);
+      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
+          StructLikeMap.create(partitionType);
+
+      for (PositionDeletesScanTask task : scanTasks) {
+        StructLike taskPartition = task.file().partition();
+        StructLike coerced =
+            PartitionUtil.coercePartition(partitionType, task.spec(), 
taskPartition);
+
+        List<PositionDeletesScanTask> partitionTasks = 
filesByPartition.get(coerced);
+        if (partitionTasks == null) {
+          partitionTasks = Lists.newArrayList();
+        }
+        partitionTasks.add(task);
+        filesByPartition.put(coerced, partitionTasks);
+      }
+
+      StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition 
=
+          StructLikeMap.create(partitionType);
+
+      filesByPartition.forEach(
+          (partition, partitionTasks) -> {
+            Iterable<List<PositionDeletesScanTask>> plannedFileGroups =
+                rewriter.planFileGroups(partitionTasks);
+            List<List<PositionDeletesScanTask>> groups = 
ImmutableList.copyOf(plannedFileGroups);
+            if (groups.size() > 0) {
+              // use coerced partition for map key uniqueness, but return 
original partition

Review Comment:
   Hm, shouldn't we use the adapted partition here as well?



##########
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java:
##########
@@ -18,16 +18,68 @@
  */
 package org.apache.iceberg.actions;
 
+import java.util.List;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
+import org.immutables.value.Value;
 
 /**
  * An action for rewriting position delete files.
  *
  * <p>Generally used for optimizing the size and layout of position delete 
files within a table.
  */
[email protected]
 public interface RewritePositionDeleteFiles

Review Comment:
   This interface looks good to me. Shall we deprecate 
`RewritePositionDeleteStrategy`, btw?



##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo;
+import 
org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Container class representing a set of position delete files to be rewritten 
by a {@link
+ * RewritePositionDeleteFiles} and the new files which have been written by 
the action.
+ */
+public class RewritePositionDeleteGroup {

Review Comment:
   What's the naming convention here? You are using `RewritePositionDeletesXXX` 
(plural) in other places.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteGroup;
+import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
+import 
org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark implementation of {@link RewritePositionDeleteFiles}. */
+public class RewritePositionDeletesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewritePositionDeletesSparkAction>
+    implements RewritePositionDeleteFiles {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesSparkAction.class);
+  private static final Set<String> VALID_OPTIONS =
+      ImmutableSet.of(
+          MAX_CONCURRENT_FILE_GROUP_REWRITES,
+          PARTIAL_PROGRESS_ENABLED,
+          PARTIAL_PROGRESS_MAX_COMMITS,
+          REWRITE_JOB_ORDER);
+  private static final Result EMPTY = 
ImmutableRewritePositionDeleteFiles.Result.builder().build();
+
+  private final Table table;
+  private final SparkBinPackPositionDeletesRewriter rewriter;
+
+  private int maxConcurrentFileGroupRewrites;
+  private int maxCommits;
+  private boolean partialProgressEnabled;
+  private RewriteJobOrder rewriteJobOrder;
+
+  RewritePositionDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
+  }
+
+  @Override
+  protected RewritePositionDeletesSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public RewritePositionDeletesSparkAction filter(Expression expression) {
+    throw new UnsupportedOperationException("Regular filters not supported 
yet.");
+  }
+
+  @Override
+  public RewritePositionDeleteFiles.Result execute() {
+    if (table.currentSnapshot() == null) {
+      LOG.info("Nothing found to rewrite in empty table {}", table.name());
+      return EMPTY;
+    }
+
+    validateAndInitOptions();
+
+    Map<StructLike, List<List<PositionDeletesScanTask>>> fileGroupsByPartition 
= planFileGroups();
+    RewriteExecutionContext ctx = new 
RewriteExecutionContext(fileGroupsByPartition);
+
+    if (ctx.totalGroupCount() == 0) {
+      LOG.info("Nothing found to rewrite in {}", table.name());
+      return EMPTY;
+    }
+
+    Stream<RewritePositionDeleteGroup> groupStream = toGroupStream(ctx, 
fileGroupsByPartition);
+
+    RewritePositionDeletesCommitManager commitManager = commitManager();
+    if (partialProgressEnabled) {
+      return doExecuteWithPartialProgress(ctx, groupStream, commitManager);
+    } else {
+      return doExecute(ctx, groupStream, commitManager);
+    }
+  }
+
+  private Map<StructLike, List<List<PositionDeletesScanTask>>> 
planFileGroups() {
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.POSITION_DELETES);
+    CloseableIterable<PositionDeletesScanTask> scanTasks =
+        CloseableIterable.transform(
+            deletesTable.newBatchScan().ignoreResiduals().planFiles(),
+            t -> (PositionDeletesScanTask) t);
+
+    try {
+      StructType partitionType = Partitioning.partitionType(table);
+      StructLikeMap<List<PositionDeletesScanTask>> filesByPartition =
+          StructLikeMap.create(partitionType);
+
+      for (PositionDeletesScanTask task : scanTasks) {
+        StructLike taskPartition = task.file().partition();

Review Comment:
   optional: You may consider a helper method.
   
   ```
   private StructLike XXX(PositionDeletesScanTask task, StructType 
partitionType) {
     return PartitionUtil.coercePartition(partitionType, task.spec(), 
task.partition());
   }
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java:
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteGroup;
+import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
+import 
org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Spark implementation of {@link RewritePositionDeleteFiles}. */
+public class RewritePositionDeletesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RewritePositionDeletesSparkAction>
+    implements RewritePositionDeleteFiles {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RewritePositionDeletesSparkAction.class);
+  private static final Set<String> VALID_OPTIONS =
+      ImmutableSet.of(
+          MAX_CONCURRENT_FILE_GROUP_REWRITES,
+          PARTIAL_PROGRESS_ENABLED,
+          PARTIAL_PROGRESS_MAX_COMMITS,
+          REWRITE_JOB_ORDER);
+  private static final Result EMPTY = 
ImmutableRewritePositionDeleteFiles.Result.builder().build();

Review Comment:
   optional: `EMPTY_RESULT`?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.MetadataTableType.POSITION_DELETES;
+import static org.apache.spark.sql.functions.col;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import org.apache.iceberg.DataFilesTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.SizeBasedPositionDeletesRewriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableCache;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+
+class SparkBinPackPositionDeletesRewriter extends 
SizeBasedPositionDeletesRewriter {
+
+  private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
+  private final ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+  private final PositionDeletesRewriteCoordinator coordinator =
+      PositionDeletesRewriteCoordinator.get();
+
+  SparkBinPackPositionDeletesRewriter(SparkSession spark, Table table) {
+    super(table);
+    // Disable Adaptive Query Execution as this may change the output 
partitioning of our write
+    this.spark = spark.cloneSession();
+    this.spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+  }
+
+  @Override
+  public String description() {
+    return "BIN-PACK";
+  }
+
+  @Override
+  public Set<DeleteFile> rewrite(List<PositionDeletesScanTask> group) {
+    String groupId = UUID.randomUUID().toString();
+    Table deletesTable = 
MetadataTableUtils.createMetadataTableInstance(table(), POSITION_DELETES);
+    try {
+      tableCache.add(groupId, deletesTable);
+      taskSetManager.stageTasks(deletesTable, groupId, group);
+
+      doRewrite(groupId, group);
+
+      return coordinator.fetchNewFiles(deletesTable, groupId);
+    } finally {
+      tableCache.remove(groupId);
+      taskSetManager.removeTasks(deletesTable, groupId);
+      coordinator.clearRewrite(deletesTable, groupId);
+    }
+  }
+
+  protected void doRewrite(String groupId, List<PositionDeletesScanTask> 
group) {
+    // ensure AQE is disabled for full control of splits

Review Comment:
   This is probably redundant now given that we clone the session in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to