This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a50ec923f3 Core: Interface changes for separating rewrite planner and 
runner (#12306)
a50ec923f3 is described below

commit a50ec923f3d928f67e2a4a361c0d1162341aa084
Author: pvary <[email protected]>
AuthorDate: Thu Feb 27 02:22:56 2025 +0100

    Core: Interface changes for separating rewrite planner and runner (#12306)
---
 .../apache/iceberg/actions/FileRewritePlan.java    | 71 ++++++++++++++++
 .../apache/iceberg/actions/FileRewritePlanner.java | 88 ++++++++++++++++++++
 .../apache/iceberg/actions/FileRewriteRunner.java  | 76 +++++++++++++++++
 .../apache/iceberg/actions/RewriteFileGroup.java   | 70 +++++++++++-----
 .../apache/iceberg/actions/RewriteGroupBase.java   | 96 ++++++++++++++++++++++
 .../actions/RewritePositionDeletesGroup.java       | 65 +++++++++------
 6 files changed, 421 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java 
b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
new file mode 100644
index 0000000000..3ad112de77
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * Container class holding the output of a {@link FileRewritePlanner#plan()} 
call. Contains a list
+ * of groups (See: {@link RewriteFileGroup} and {@link 
RewritePositionDeletesGroup}) which are used
+ * by engines to rewrite a particular set of files.
+ *
+ * @param <I> the Java type of the plan info like {@link 
RewriteDataFiles.FileGroupInfo} or {@link
+ *     RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link 
RewriteFileGroup} or {@link
+ *     RewritePositionDeletesGroup}
+ */
+public class FileRewritePlan<
+    I,
+    T extends ContentScanTask<F>,
+    F extends ContentFile<F>,
+    G extends RewriteGroupBase<I, T, F>> {
+  private final CloseableIterable<G> groups;
+  private final int totalGroupCount;
+  private final Map<StructLike, Integer> groupsInPartition;
+
+  FileRewritePlan(
+      CloseableIterable<G> groups,
+      int totalGroupCount,
+      Map<StructLike, Integer> groupsInPartition) {
+    this.groups = groups;
+    this.totalGroupCount = totalGroupCount;
+    this.groupsInPartition = groupsInPartition;
+  }
+
+  /** The stream of the generated {@link RewriteGroupBase}s. */
+  public CloseableIterable<G> groups() {
+    return groups;
+  }
+
+  /** The number of the generated groups in the given partition. */
+  public int groupsInPartition(StructLike partition) {
+    return groupsInPartition.get(partition);
+  }
+
+  /** The total number of the groups generated by this plan. */
+  public int totalGroupCount() {
+    return totalGroupCount;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java 
b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
new file mode 100644
index 0000000000..2e47f4dd91
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for planning a rewrite operation.
+ *
+ * <p>A Rewrite Operation is performed by several classes working in 
conjunction.
+ *
+ * <ul>
+ *   <li>{@link FileRewritePlanner} - (this class) which scans the files in 
the table and determines
+ *       which files should be rewritten and how they should be grouped. The 
grouping is based on
+ *       partitioning and the planning could create multiple groups within a 
partition. This
+ *       produces a {@link FileRewritePlan}.
+ *   <li>{@link FileRewritePlan} - This immutable output of the planner 
contains a set of groups
+ *       like {@link RewriteFileGroup} or {@link RewritePositionDeletesGroup}, 
each containing a set
+ *       of files that are meant to be compacted together by the {@link 
FileRewriteRunner}.
+ *   <li>{@link FileRewriteRunner} - The runner is the engine specific 
implementation that can take
+ *       a {@link FileRewritePlan} and perform a rewrite on each of the groups 
within. This is the
+ *       actual implementation of the rewrite which generates new files.
+ * </ul>
+ *
+ * <p>The lifecycle for the planner looks like the following:
+ *
+ * <ul>
+ *   <li>{@link #init(Map)} initializes the planner with the configuration 
parameters
+ *   <li>{@link #plan()} called for generating the {@link FileRewritePlan} for 
the given parameters.
+ * </ul>
+ *
+ * @param <I> the Java type of the plan info like {@link 
RewriteDataFiles.FileGroupInfo} or {@link
+ *     RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link 
RewriteFileGroup} or {@link
+ *     RewritePositionDeletesGroup}
+ */
+public interface FileRewritePlanner<
+    I,
+    T extends ContentScanTask<F>,
+    F extends ContentFile<F>,
+    G extends RewriteGroupBase<I, T, F>> {
+
+  /** Returns a description for this planner. */
+  default String description() {
+    return getClass().getName();
+  }
+
+  /**
+   * Returns a set of supported options for this planner. Only options 
specified in this list will
+   * be accepted at runtime. Any other options will be rejected.
+   */
+  Set<String> validOptions();
+
+  /**
+   * Initializes this planner using provided options.
+   *
+   * @param options options to initialize this planner
+   */
+  void init(Map<String, String> options);
+
+  /**
+   * Generates the plan for rewrite.
+   *
+   * @return the generated plan which could be executed during the compaction
+   */
+  FileRewritePlan<I, T, F, G> plan();
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java 
b/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java
new file mode 100644
index 0000000000..4c7384861c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriteRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for rewriting content file groups ({@link RewriteGroupBase}). The 
lifecycle for the
+ * runner looks like the following:
+ *
+ * <ul>
+ *   <li>{@link #init(Map)} initializes the runner with the configuration 
parameters
+ *   <li>{@link #rewrite(RewriteGroupBase)} called for every group in the plan 
to do the actual
+ *       rewrite of the files, and returns the generated new files.
+ * </ul>
+ *
+ * @param <I> the Java type of the plan info like {@link 
RewriteDataFiles.FileGroupInfo} or {@link
+ *     RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ * @param <G> the Java type of the rewrite file group like {@link 
RewriteFileGroup} or {@link
+ *     RewritePositionDeletesGroup}
+ */
+public interface FileRewriteRunner<
+    I,
+    T extends ContentScanTask<F>,
+    F extends ContentFile<F>,
+    G extends RewriteGroupBase<I, T, F>> {
+
+  /** Returns a description for this runner. */
+  default String description() {
+    return getClass().getName();
+  }
+
+  /**
+   * Returns a set of supported options for this runner. Only options 
specified in this list will be
+   * accepted at runtime. Any other options will be rejected.
+   */
+  Set<String> validOptions();
+
+  /**
+   * Initializes this runner using provided options.
+   *
+   * @param options options to initialize this runner
+   */
+  void init(Map<String, String> options);
+
+  /**
+   * Rewrite a group of files represented by the given list of scan tasks.
+   *
+   * <p>The implementation is supposed to be engine-specific (e.g. Spark, 
Flink, Trino).
+   *
+   * @param group of scan tasks for files to be rewritten together
+   * @return a set of newly written files
+   */
+  Set<F> rewrite(G group);
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java 
b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index dfc9842780..c31ff327b6 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -34,23 +34,32 @@ import org.apache.iceberg.util.DataFileSet;
  * Container class representing a set of files to be rewritten by a 
RewriteAction and the new files
  * which have been written by the action.
  */
-public class RewriteFileGroup {
-  private final FileGroupInfo info;
-  private final List<FileScanTask> fileScanTasks;
-
+public class RewriteFileGroup extends RewriteGroupBase<FileGroupInfo, 
FileScanTask, DataFile> {
+  private final int outputSpecId;
   private DataFileSet addedFiles = DataFileSet.create();
 
+  @Deprecated
   public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> 
fileScanTasks) {
-    this.info = info;
-    this.fileScanTasks = fileScanTasks;
+    this(info, fileScanTasks, 0, 0L, 0L, 0);
   }
 
-  public FileGroupInfo info() {
-    return info;
+  public RewriteFileGroup(
+      FileGroupInfo info,
+      List<FileScanTask> fileScanTasks,
+      int outputSpecId,
+      long writeMaxFileSize,
+      long splitSize,
+      int expectedOutputFiles) {
+    super(info, fileScanTasks, writeMaxFileSize, splitSize, 
expectedOutputFiles);
+    this.outputSpecId = outputSpecId;
   }
 
+  /**
+   * @deprecated use {@link #fileScanTasks()}
+   */
+  @Deprecated
   public List<FileScanTask> fileScans() {
-    return fileScanTasks;
+    return fileScanTasks();
   }
 
   public void setOutputFiles(Set<DataFile> files) {
@@ -58,7 +67,7 @@ public class RewriteFileGroup {
   }
 
   public Set<DataFile> rewrittenFiles() {
-    return fileScans().stream()
+    return fileScanTasks().stream()
         .map(FileScanTask::file)
         .collect(Collectors.toCollection(DataFileSet::create));
   }
@@ -70,43 +79,60 @@ public class RewriteFileGroup {
   public RewriteDataFiles.FileGroupRewriteResult asResult() {
     Preconditions.checkState(addedFiles != null, "Cannot get result, Group was 
never rewritten");
     return ImmutableRewriteDataFiles.FileGroupRewriteResult.builder()
-        .info(info)
+        .info(info())
         .addedDataFilesCount(addedFiles.size())
-        .rewrittenDataFilesCount(fileScanTasks.size())
-        .rewrittenBytesCount(sizeInBytes())
+        .rewrittenDataFilesCount(fileScanTasks().size())
+        .rewrittenBytesCount(inputFilesSizeInBytes())
         .build();
   }
 
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-        .add("info", info)
-        .add("numRewrittenFiles", fileScanTasks.size())
+        .add("info", info())
+        .add("numRewrittenFiles", fileScanTasks().size())
         .add(
             "numAddedFiles",
             addedFiles == null ? "Rewrite Incomplete" : 
Integer.toString(addedFiles.size()))
-        .add("numRewrittenBytes", sizeInBytes())
+        .add("numRewrittenBytes", inputFilesSizeInBytes())
+        .add("maxOutputFileSize", maxOutputFileSize())
+        .add("inputSplitSize", inputSplitSize())
+        .add("expectedOutputFiles", expectedOutputFiles())
+        .add("outputSpecId", outputSpecId)
         .toString();
   }
 
+  /**
+   * @deprecated use {@link #inputFilesSizeInBytes()}
+   */
+  @Deprecated
   public long sizeInBytes() {
-    return fileScanTasks.stream().mapToLong(FileScanTask::length).sum();
+    return inputFilesSizeInBytes();
   }
 
+  /**
+   * @deprecated use {@link #inputFileNum()}
+   */
+  @Deprecated
   public int numFiles() {
-    return fileScanTasks.size();
+    return inputFileNum();
+  }
+
+  public int outputSpecId() {
+    return outputSpecId;
   }
 
   public static Comparator<RewriteFileGroup> comparator(RewriteJobOrder 
rewriteJobOrder) {
     switch (rewriteJobOrder) {
       case BYTES_ASC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes);
+        return Comparator.comparing(RewriteFileGroup::inputFilesSizeInBytes);
       case BYTES_DESC:
-        return Comparator.comparing(RewriteFileGroup::sizeInBytes, 
Comparator.reverseOrder());
+        return Comparator.comparing(
+            RewriteFileGroup::inputFilesSizeInBytes, 
Comparator.reverseOrder());
       case FILES_ASC:
-        return Comparator.comparing(RewriteFileGroup::numFiles);
+        return Comparator.comparing(RewriteFileGroup::inputFileNum);
       case FILES_DESC:
-        return Comparator.comparing(RewriteFileGroup::numFiles, 
Comparator.reverseOrder());
+        return Comparator.comparing(RewriteFileGroup::inputFileNum, 
Comparator.reverseOrder());
       default:
         return (unused, unused2) -> 0;
     }
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java 
b/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java
new file mode 100644
index 0000000000..28d182bd28
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteGroupBase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * Container class representing a set of files to be rewritten by a {@link 
FileRewriteRunner}.
+ *
+ * @param <I> the Java type of the plan info member like {@link 
RewriteDataFiles.FileGroupInfo} or
+ *     {@link RewritePositionDeleteFiles.FileGroupInfo}
+ * @param <T> the Java type of the input scan tasks (input)
+ * @param <F> the Java type of the content files (input and output)
+ */
+public abstract class RewriteGroupBase<I, T extends ContentScanTask<F>, F 
extends ContentFile<F>> {
+  private final I info;
+  private final List<T> fileScanTasks;
+  private final long maxOutputFileSize;
+  private final long inputSplitSize;
+  private final int expectedOutputFiles;
+
+  RewriteGroupBase(
+      I info,
+      List<T> fileScanTasks,
+      long maxOutputFileSize,
+      long inputSplitSize,
+      int expectedOutputFiles) {
+    this.info = info;
+    this.fileScanTasks = fileScanTasks;
+    this.maxOutputFileSize = maxOutputFileSize;
+    this.inputSplitSize = inputSplitSize;
+    this.expectedOutputFiles = expectedOutputFiles;
+  }
+
+  /** Identifiers and partition information about the group. */
+  public I info() {
+    return info;
+  }
+
+  /** Scan tasks for input files. */
+  public List<T> fileScanTasks() {
+    return fileScanTasks;
+  }
+
+  /** Accumulated size for the input files. */
+  public long inputFilesSizeInBytes() {
+    return fileScanTasks.stream().mapToLong(T::length).sum();
+  }
+
+  /** Number of the input files. */
+  public int inputFileNum() {
+    return fileScanTasks.size();
+  }
+
+  /**
+   * The target file size which should be used by the {@link 
FileRewriteRunner}. The {@link
+   * FileRewritePlanner} could chose different values than defined by the 
table properties.
+   *
+   * @return the target size should be used by the runner
+   */
+  public long maxOutputFileSize() {
+    return maxOutputFileSize;
+  }
+
+  /**
+   * The amount of bytes of data the {@link FileRewriteRunner} should read 
from a single group in a
+   * single read task. The {@link FileRewritePlanner} chooses a value to allow 
parallelization for
+   * the runners, but prevent fragmentation of the output caused by too many 
readers.
+   */
+  public long inputSplitSize() {
+    return inputSplitSize;
+  }
+
+  /** The total number of files that should be produced by the rewrite of this 
entire file group. */
+  public int expectedOutputFiles() {
+    return expectedOutputFiles;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
index d1c688417a..168c63f66f 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
@@ -35,27 +35,35 @@ import org.apache.iceberg.util.DeleteFileSet;
  * Container class representing a set of position delete files to be rewritten 
by a {@link
  * RewritePositionDeleteFiles} and the new files which have been written by 
the action.
  */
-public class RewritePositionDeletesGroup {
-  private final FileGroupInfo info;
-  private final List<PositionDeletesScanTask> tasks;
+public class RewritePositionDeletesGroup
+    extends RewriteGroupBase<FileGroupInfo, PositionDeletesScanTask, 
DeleteFile> {
   private final long maxRewrittenDataSequenceNumber;
 
   private DeleteFileSet addedDeleteFiles = DeleteFileSet.create();
 
+  @Deprecated
   public RewritePositionDeletesGroup(FileGroupInfo info, 
List<PositionDeletesScanTask> tasks) {
+    this(info, tasks, 0L, 0L, 0);
+  }
+
+  public RewritePositionDeletesGroup(
+      FileGroupInfo info,
+      List<PositionDeletesScanTask> tasks,
+      long writeMaxFileSize,
+      long splitSize,
+      int expectedOutputFiles) {
+    super(info, tasks, writeMaxFileSize, splitSize, expectedOutputFiles);
     Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
-    this.info = info;
-    this.tasks = tasks;
     this.maxRewrittenDataSequenceNumber =
         tasks.stream().mapToLong(t -> 
t.file().dataSequenceNumber()).max().getAsLong();
   }
 
-  public FileGroupInfo info() {
-    return info;
-  }
-
+  /**
+   * @deprecated use {@link #fileScanTasks()}
+   */
+  @Deprecated
   public List<PositionDeletesScanTask> tasks() {
-    return tasks;
+    return fileScanTasks();
   }
 
   public void setOutputFiles(Set<DeleteFile> files) {
@@ -67,7 +75,7 @@ public class RewritePositionDeletesGroup {
   }
 
   public Set<DeleteFile> rewrittenDeleteFiles() {
-    return tasks().stream()
+    return fileScanTasks().stream()
         .map(PositionDeletesScanTask::file)
         .collect(Collectors.toCollection(DeleteFileSet::create));
   }
@@ -81,10 +89,10 @@ public class RewritePositionDeletesGroup {
         addedDeleteFiles != null, "Cannot get result, Group was never 
rewritten");
 
     return ImmutableRewritePositionDeleteFiles.FileGroupRewriteResult.builder()
-        .info(info)
+        .info(info())
         .addedDeleteFilesCount(addedDeleteFiles.size())
-        .rewrittenDeleteFilesCount(tasks.size())
-        .rewrittenBytesCount(rewrittenBytes())
+        .rewrittenDeleteFilesCount(inputFileNum())
+        .rewrittenBytesCount(inputFilesSizeInBytes())
         .addedBytesCount(addedBytes())
         .build();
   }
@@ -92,42 +100,53 @@ public class RewritePositionDeletesGroup {
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
-        .add("info", info)
-        .add("numRewrittenPositionDeleteFiles", tasks.size())
+        .add("info", info())
+        .add("numRewrittenPositionDeleteFiles", fileScanTasks().size())
         .add(
             "numAddedPositionDeleteFiles",
             addedDeleteFiles == null
                 ? "Rewrite Incomplete"
                 : Integer.toString(addedDeleteFiles.size()))
         .add("numAddedBytes", addedBytes())
-        .add("numRewrittenBytes", rewrittenBytes())
+        .add("numRewrittenBytes", inputFilesSizeInBytes())
+        .add("maxOutputFileSize", maxOutputFileSize())
+        .add("inputSplitSize", inputSplitSize())
+        .add("expectedOutputFiles", expectedOutputFiles())
         .toString();
   }
 
+  /**
+   * @deprecated use {@link #inputFilesSizeInBytes()}
+   */
+  @Deprecated
   public long rewrittenBytes() {
-    return tasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
+    return inputFilesSizeInBytes();
   }
 
   public long addedBytes() {
     return 
addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
   }
 
+  /**
+   * @deprecated use {@link #inputFileNum()}
+   */
+  @Deprecated
   public int numRewrittenDeleteFiles() {
-    return tasks.size();
+    return inputFileNum();
   }
 
   public static Comparator<RewritePositionDeletesGroup> 
comparator(RewriteJobOrder order) {
     switch (order) {
       case BYTES_ASC:
-        return 
Comparator.comparing(RewritePositionDeletesGroup::rewrittenBytes);
+        return 
Comparator.comparing(RewritePositionDeletesGroup::inputFilesSizeInBytes);
       case BYTES_DESC:
         return Comparator.comparing(
-            RewritePositionDeletesGroup::rewrittenBytes, 
Comparator.reverseOrder());
+            RewritePositionDeletesGroup::inputFilesSizeInBytes, 
Comparator.reverseOrder());
       case FILES_ASC:
-        return 
Comparator.comparing(RewritePositionDeletesGroup::numRewrittenDeleteFiles);
+        return Comparator.comparing(RewritePositionDeletesGroup::inputFileNum);
       case FILES_DESC:
         return Comparator.comparing(
-            RewritePositionDeletesGroup::numRewrittenDeleteFiles, 
Comparator.reverseOrder());
+            RewritePositionDeletesGroup::inputFileNum, 
Comparator.reverseOrder());
       default:
         return (unused, unused2) -> 0;
     }

Reply via email to