aokolnychyi commented on a change in pull request #2466:
URL: https://github.com/apache/iceberg/pull/2466#discussion_r612702721



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -368,6 +374,23 @@ private void commitWithSnapshotIsolation(OverwriteFiles 
overwriteFiles,
     }
   }
 
+  private class CompactionWrite extends BaseBatchWrite {
+    private final String jobID;
+
+    private CompactionWrite(String jobID) {
+      this.jobID = jobID;
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      List<DataFile> newDataFiles = 
Lists.newArrayListWithExpectedSize(messages.length);
+      for (DataFile file : files(messages)) {
+        newDataFiles.add(file);
+      }
+      DataCompactionJobCoordinator.commitJob(table, jobID, newDataFiles);

Review comment:
       I think the action can do whatever it does in the WIP PR now (i.e. plan 
jobs, log partitions, num files, etc), then add task sets into 
`TaskSetManager`, trigger compaction jobs with read/write options, then the 
writer will call `finishJob` to propagate (but not commit) the results and then 
the action will call `commitJob`/`commitJobs`.
   
   We will only have to return correct stats from `commitJob` (basically, 
whatever is needed for the result). We can make the job coordinator part of the 
action too.
   
   The logic in the action won't be trivial so I am thinking whether we can 
simplify it by moving parts into helper classes. In this case, the coordinator 
can take care of committing and locking.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to