wuwenchi commented on code in PR #4522:
URL: https://github.com/apache/iceberg/pull/4522#discussion_r860757510


##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -270,43 +275,69 @@ private Map<StructLikeWrapper, Collection<FileScanTask>> 
groupTasksByPartition(
     return tasksGroupedByPartition.asMap();
   }
 
-  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, 
Iterable<DataFile> addedDataFiles,
-                                long startingSnapshotId) {
+  private void replaceDataFiles(RewriteResult result, long startingSnapshotId) 
{
     try {
-      doReplace(deletedDataFiles, addedDataFiles, startingSnapshotId);
+      RewriteFiles rewriteFiles = table.newRewrite()
+          .validateFromSnapshot(startingSnapshotId)
+          .rewriteFiles(
+              Sets.newHashSet(result.dataFilesToDelete()),
+              Sets.newHashSet(result.deleteFilesToDelete()),
+              Sets.newHashSet(result.dataFilesToAdd()),
+              Sets.newHashSet(result.deleteFilesToAdd())
+          );
+
+      commit(rewriteFiles);
     } catch (CommitStateUnknownException e) {
       LOG.warn("Commit state unknown, cannot clean up files that may have been 
committed", e);
       throw e;
     } catch (Exception e) {
       LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
-      Tasks.foreach(Iterables.transform(addedDataFiles, f -> 
f.path().toString()))
+
+      // Remove all the newly produced files if possible.
+      List<ContentFile<?>> addedFiles = Lists.newArrayList();
+      Collections.addAll(addedFiles, result.dataFilesToAdd());
+      Collections.addAll(addedFiles, result.deleteFilesToAdd());
+
+      Tasks.foreach(Iterables.transform(addedFiles, f -> f.path().toString()))
           .noRetry()
           .suppressFailureWhenFinished()
           .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
           .run(fileIO::deleteFile);
+
       throw e;
     }
   }
 
-  @VisibleForTesting
-  void doReplace(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> 
addedDataFiles,
-      long startingSnapshotId) {
-    RewriteFiles rewriteFiles = table.newRewrite()
-        .validateFromSnapshot(startingSnapshotId)
-        .rewriteFiles(Sets.newHashSet(deletedDataFiles), 
Sets.newHashSet(addedDataFiles));
-    commit(rewriteFiles);
+  private boolean doPartitionNeedRewrite(Collection<FileScanTask> 
partitionTasks) {
+    int files = 0;
+    for (FileScanTask scanTask : partitionTasks) {
+      files += 1; // One for data file.
+      files += scanTask.deletes().size();
+    }
+    return files > 1;

Review Comment:
   If I only want to split a large file, is there a problem here?



##########
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java:
##########
@@ -270,43 +275,69 @@ private Map<StructLikeWrapper, Collection<FileScanTask>> 
groupTasksByPartition(
     return tasksGroupedByPartition.asMap();
   }
 
-  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, 
Iterable<DataFile> addedDataFiles,
-                                long startingSnapshotId) {
+  private void replaceDataFiles(RewriteResult result, long startingSnapshotId) 
{
     try {
-      doReplace(deletedDataFiles, addedDataFiles, startingSnapshotId);
+      RewriteFiles rewriteFiles = table.newRewrite()
+          .validateFromSnapshot(startingSnapshotId)
+          .rewriteFiles(
+              Sets.newHashSet(result.dataFilesToDelete()),
+              Sets.newHashSet(result.deleteFilesToDelete()),
+              Sets.newHashSet(result.dataFilesToAdd()),
+              Sets.newHashSet(result.deleteFilesToAdd())
+          );
+
+      commit(rewriteFiles);
     } catch (CommitStateUnknownException e) {
       LOG.warn("Commit state unknown, cannot clean up files that may have been 
committed", e);
       throw e;
     } catch (Exception e) {
       LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
-      Tasks.foreach(Iterables.transform(addedDataFiles, f -> 
f.path().toString()))
+
+      // Remove all the newly produced files if possible.
+      List<ContentFile<?>> addedFiles = Lists.newArrayList();
+      Collections.addAll(addedFiles, result.dataFilesToAdd());
+      Collections.addAll(addedFiles, result.deleteFilesToAdd());
+
+      Tasks.foreach(Iterables.transform(addedFiles, f -> f.path().toString()))
           .noRetry()
           .suppressFailureWhenFinished()
           .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
           .run(fileIO::deleteFile);
+
       throw e;
     }
   }
 
-  @VisibleForTesting
-  void doReplace(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> 
addedDataFiles,
-      long startingSnapshotId) {
-    RewriteFiles rewriteFiles = table.newRewrite()
-        .validateFromSnapshot(startingSnapshotId)
-        .rewriteFiles(Sets.newHashSet(deletedDataFiles), 
Sets.newHashSet(addedDataFiles));
-    commit(rewriteFiles);
+  private boolean doPartitionNeedRewrite(Collection<FileScanTask> 
partitionTasks) {
+    int files = 0;
+    for (FileScanTask scanTask : partitionTasks) {
+      files += 1; // One for data file.
+      files += scanTask.deletes().size();
+    }
+    return files > 1;

Review Comment:
   If I only want to split a large file, is there a problem here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to