aokolnychyi commented on a change in pull request #2466:
URL: https://github.com/apache/iceberg/pull/2466#discussion_r612702721
##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -368,6 +374,23 @@ private void commitWithSnapshotIsolation(OverwriteFiles
overwriteFiles,
}
}
+ private class CompactionWrite extends BaseBatchWrite {
+ private final String jobID;
+
+ private CompactionWrite(String jobID) {
+ this.jobID = jobID;
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ List<DataFile> newDataFiles =
Lists.newArrayListWithExpectedSize(messages.length);
+ for (DataFile file : files(messages)) {
+ newDataFiles.add(file);
+ }
+ DataCompactionJobCoordinator.commitJob(table, jobID, newDataFiles);
Review comment:
I think the action can do whatever it does in the WIP PR now (i.e. plan
jobs, log partitions, num files, etc), then add task sets into
`TaskSetManager`, trigger compaction jobs with read/write options, then the
writer will call `finishJob` to propagate (but not commit) the results and then
the action will call `commitJob`/`commitJobs`.
We will only have to return correct stats from `commitJob` (basically,
whatever is needed for the result). We can make the job coordinator part of the
action too.
The logic in the action won't be trivial so I am thinking whether we can
simplify it by moving parts into helper objects. In this case, the coordinator
can take of committing and locking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]