jackye1995 commented on a change in pull request #3207:
URL: https://github.com/apache/iceberg/pull/3207#discussion_r718925782



##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -76,6 +76,22 @@
    */
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
+  /**
+   * Determines if the data rewrite action should also remove non-global 
deletes associated with the data files.
+   * By enabling this option, any data filter specified through {@link 
#filter(Expression)} will be converted to
+   * an inclusive partition filter based on all the historical partition specs 
of the table.
+   */
+  String REMOVE_PARTITION_DELETES = "remove-partition-deletes";

Review comment:
       This just means non-global deletes, with lack of a better name.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -76,6 +76,22 @@
    */
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
+  /**
+   * Determines if the data rewrite action should also remove non-global 
deletes associated with the data files.
+   * By enabling this option, any data filter specified through {@link 
#filter(Expression)} will be converted to
+   * an inclusive partition filter based on all the historical partition specs 
of the table.

Review comment:
       My understanding is that for non-global deletes, as long as the the 
filter is a partition filter, if we compact all the data files produced by the 
plan, the delete file can be safely removed. This is inefficient because 
technically we should do the following:
   1. get all data files satisfying the filter
   2. get the delete files of the data files
   3. for the delete files, find the connected component (if we view this as a 
dependency graph of files), which might produce a much smaller subset of data 
files to compact
   4. replan tasks based on the set of data and delete files
   
   But that goes too far away from the RewriteDataFiles action, and might be an 
overkill in the end.

##########
File path: core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
##########
@@ -120,14 +125,19 @@ public RewriteStrategy options(Map<String, String> 
options) {
         MIN_INPUT_FILES,
         MIN_INPUT_FILES_DEFAULT);
 
+    removeDeletes = PropertyUtil.propertyAsBoolean(
+        options, RewriteDataFiles.REMOVE_GLOBAL_DELETES, 
RewriteDataFiles.REMOVE_GLOBAL_DELETES_DEFAULT) ||
+        PropertyUtil.propertyAsBoolean(options, 
RewriteDataFiles.REMOVE_PARTITION_DELETES,
+            RewriteDataFiles.REMOVE_PARTITION_DELETES_DEFAULT);
+
     validateOptions();
     return this;
   }
 
   @Override
   public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> 
dataFiles) {
     return FluentIterable.from(dataFiles)
-        .filter(scanTask -> scanTask.length() < minFileSize || 
scanTask.length() > maxFileSize);
+        .filter(scanTask -> removeDeletes || scanTask.length() < minFileSize 
|| scanTask.length() > maxFileSize);

Review comment:
       this should probably be `removeDeletes && !scanTask.deletes().isEmpty()`

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -326,13 +333,35 @@ private void validateAndInitOptions() {
         PARTIAL_PROGRESS_ENABLED,
         PARTIAL_PROGRESS_ENABLED_DEFAULT);
 
+    removePartitionDeletes = PropertyUtil.propertyAsBoolean(options(),
+        REMOVE_PARTITION_DELETES,
+        REMOVE_PARTITION_DELETES_DEFAULT);
+
+    removeGlobalDeletes = PropertyUtil.propertyAsBoolean(options(),
+        REMOVE_GLOBAL_DELETES,
+        REMOVE_GLOBAL_DELETES_DEFAULT);
+
     Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
         "Cannot set %s to %s, the value must be positive.",
         MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
 
     Preconditions.checkArgument(!partialProgressEnabled || 
partialProgressEnabled && maxCommits > 0,
         "Cannot set %s to %s, the value must be positive when %s is true",
         PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED);
+
+    Preconditions.checkArgument(removePartitionDeletes || !removeGlobalDeletes,

Review comment:
       technically we can allow this case, but I don't know why people would 
try to do that

##########
File path: 
spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -151,6 +169,52 @@ public void testBinPackPartitionedTable() {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test

Review comment:
       currently just add 1 sanity test, will add more later once we agree upon 
the interface changes.

##########
File path: core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
##########
@@ -120,14 +125,19 @@ public RewriteStrategy options(Map<String, String> 
options) {
         MIN_INPUT_FILES,
         MIN_INPUT_FILES_DEFAULT);
 
+    removeDeletes = PropertyUtil.propertyAsBoolean(
+        options, RewriteDataFiles.REMOVE_GLOBAL_DELETES, 
RewriteDataFiles.REMOVE_GLOBAL_DELETES_DEFAULT) ||
+        PropertyUtil.propertyAsBoolean(options, 
RewriteDataFiles.REMOVE_PARTITION_DELETES,
+            RewriteDataFiles.REMOVE_PARTITION_DELETES_DEFAULT);
+
     validateOptions();
     return this;
   }
 
   @Override
   public Iterable<FileScanTask> selectFilesToRewrite(Iterable<FileScanTask> 
dataFiles) {
     return FluentIterable.from(dataFiles)
-        .filter(scanTask -> scanTask.length() < minFileSize || 
scanTask.length() > maxFileSize);
+        .filter(scanTask -> removeDeletes || scanTask.length() < minFileSize 
|| scanTask.length() > maxFileSize);

Review comment:
       this should probably be `removeDeletes && !scanTask.deletes().isEmpty()`

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -76,6 +76,22 @@
    */
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
+  /**
+   * Determines if the data rewrite action should also remove non-global 
deletes associated with the data files.
+   * By enabling this option, any data filter specified through {@link 
#filter(Expression)} will be converted to
+   * an inclusive partition filter based on all the historical partition specs 
of the table.

Review comment:
       My understanding is that for non-global deletes, as long as the the 
filter is a partition filter, if we compact all the data files produced by the 
plan, those delete file can be safely removed. This is the strategy this PR 
follows.
   
   However, this is inefficient because technically we can do the following:
   1. get all data files satisfying the filter
   2. get the delete files of the data files
   3. for the delete files, find the connected component (if we view this as a 
dependency graph of files), which might produce a much smaller subset of data 
files to compact
   4. replan tasks based on the set of data and delete files
   
   But that goes too far away from the RewriteDataFiles action, and might be an 
overkill in the end.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -76,6 +76,22 @@
    */
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
+  /**
+   * Determines if the data rewrite action should also remove non-global 
deletes associated with the data files.
+   * By enabling this option, any data filter specified through {@link 
#filter(Expression)} will be converted to
+   * an inclusive partition filter based on all the historical partition specs 
of the table.
+   */
+  String REMOVE_PARTITION_DELETES = "remove-partition-deletes";
+  boolean REMOVE_PARTITION_DELETES_DEFAULT = false;
+
+  /**
+   * Determines if the data rewrite action should also remove global deletes.
+   * When enabling this option, specify a data filter would result in {@link 
IllegalArgumentException}
+   * because a full table scan planning must be performed to safely remove 
global deletes.

Review comment:
       Similarly, we block removal of global delete files. Only when we know we 
have all data files in the scan tasks, we can safely say the global deletes are 
merged. This is not the most efficient approach because maybe we can know a 
data filter does not impact the ability to remove a global delete based on some 
statistics, but I think this is the best we can do while leveraging the same 
interface and avoiding the need to do customized planning.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -76,6 +76,22 @@
    */
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
+  /**
+   * Determines if the data rewrite action should also remove non-global 
deletes associated with the data files.
+   * By enabling this option, any data filter specified through {@link 
#filter(Expression)} will be converted to
+   * an inclusive partition filter based on all the historical partition specs 
of the table.
+   */
+  String REMOVE_PARTITION_DELETES = "remove-partition-deletes";
+  boolean REMOVE_PARTITION_DELETES_DEFAULT = false;
+
+  /**
+   * Determines if the data rewrite action should also remove global deletes.
+   * When enabling this option, specify a data filter would result in {@link 
IllegalArgumentException}
+   * because a full table scan planning must be performed to safely remove 
global deletes.

Review comment:
       Similarly, we block removal of global delete files unless there is no 
data filter. Only when we know we have all data files in the scan tasks, we can 
safely say the global deletes are merged. This is not the most efficient 
approach because maybe we can know a data filter does not impact the ability to 
remove a global delete based on some statistics, but I think this is the best 
we can do while leveraging the same interface and avoiding the need to do 
customized planning.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to