kbendick commented on a change in pull request #2501:
URL: https://github.com/apache/iceberg/pull/2501#discussion_r627081408



##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the 
next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during 
packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then 
this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by 
the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For 
example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one 
"bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that 
file group. If the results are null then
+   * that particular file group failed. We should only have failed groups if 
partial progress is enabled otherwise we
+   * will report a total failure for the job.
    */
   interface Result {
+    Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
+  }

Review comment:
       Possibly I missed this in the discussion, but should we return something 
other than null for the failure? I can imagine that we'd want to possibly 
return metadata about the failure, possibly trimmed stack traces, or something 
else specific to an individual compaction strategy.
   
   I prefer error responses over null for errors as null is a lot easier to 
misinterpret or simply miss entirely and then NPE.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the 
next file split into a task.
+   * @param strategyName name of the strategy

Review comment:
       Maybe "name of the compaction strategy" so that the param definition 
isn't _exactly_ the param name?
   
   Though I recognize this is partially to appease the linter so might be too 
nit-picky on my part. 🙂 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";

Review comment:
       +1 on keeping `partition` in the term. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the 
next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during 
packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then 
this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by 
the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For 
example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one 
"bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that 
file group. If the results are null then

Review comment:
       Nit: Pairing makes it sound like there's only one entry in the map to 
me. Maybe just "a map"?

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.

Review comment:
       +1. Aligns with many other similar configs and is more explicit.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the 
next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during 
packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then 
this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by 
the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For 
example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one 
"bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that 
file group. If the results are null then
+   * that particular file group failed. We should only have failed groups if 
partial progress is enabled otherwise we
+   * will report a total failure for the job.
    */
   interface Result {
+    Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
+  }
+
+  interface FileGroupRewriteResult {
+    int addedDataFilesCount();
+
+    int rewrittenDataFilesCount();
+  }
+
+  /**
+   * A description of a file group, when it was processed, and within which 
partition. For use
+   * tracking rewrite operations and for returning results.
+   */
+  interface FileGroupInfo {
+
+    /**
+     * returns which file group this is out of the total set of file groups 
for this compaction
+     */
+    int globalIndex();

Review comment:
       Correct me if I'm wrong, but I think that since this is `FileGroupInfo` 
and not the Result, the indexes are likely included so the engine assign and 
track work etc as opposed to purely for tracking on output. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.

Review comment:
       Is the `currently only support options BINPACK and SORT options` comment 
necessary?. To me, what's supported is a natural extension of the enum. And 
then if we declare an enum that's not implemented, we can specify that. Feels 
like less to maintain / less likely to have the comments drift from the 
reality, but I'd like to hear what others have to say (and it might be a 
non-issue).

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. 
Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, 
RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, 
RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be 
rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the 
entire compaction completing.
+   * This will produce additional commits but allow for progress even if some 
groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is 
false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to 
produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on 
partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as 
file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files 
which will be included in a single file
+   * group. A group will be processed by a single framework "action". For 
example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain 
files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 
100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the 
compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file 
group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to 
generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table 
being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this 
operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file 
split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file 
regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data 
files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the 
next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during 
packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then 
this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by 
the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For 
example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one 
"bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted

Review comment:
       Nit: Possibly start the param definition with the noun or focus on the 
expression rather than the result? Like `Filter expression used to select 
entries for compaction` or something? Reads a little funny to me but I might 
not have had enough coffee today. 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to