rdblue commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2719100024
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,125 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
return cachedNewDeleteManifests;
}
+ // Merge duplicates, internally takes care of updating newDeleteFilesBySpec
to remove
+ // duplicates and add the newly merged DV
+ private void mergeDVsAndWrite() {
+ Map<String, DeleteFileSet> dataFilesWithDuplicateDVs =
+ dvsByReferencedFile.entrySet().stream()
+ .filter(entry -> entry.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ List<MergedDVContent> mergedDVs =
Collections.synchronizedList(Lists.newArrayList());
+ Tasks.foreach(dataFilesWithDuplicateDVs.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ entry -> {
+ String referencedLocation = entry.getKey();
+ DeleteFileSet duplicateDVs = entry.getValue();
+ mergedDVs.add(mergePositions(referencedLocation, duplicateDVs));
+ });
+
+ // Update newDeleteFilesBySpec to remove all the duplicates
+ mergedDVs.forEach(
+ mergedDV ->
newDeleteFilesBySpec.get(mergedDV.specId).removeAll(mergedDV.duplicateDVs));
Review Comment:
I recommend avoiding methods like this that modify the instance state of
snapshot producer. That should only happen in cases where we need to cache
results, and cached results should be held in separate instance state. For
example, this is how `newDataFilesBySpec` and `cachedNewDataManifests` work.
The new data files are kept organized by spec, but that state is not rewritten.
Instead, `cachedNewDataManifests` is used in case of retry.
Modifying the instance state like this hides how the class was configured by
the caller after a merge happens, and relies on side-effects that are not
obvious to people working with the code later.
I think this should be refactored so that as many of these new methods as
possible are static and in a `DVUtil` class rather than inline in this already
huge class. Merging DVs is independently useful and shouldn't live here. This
method may also be a good candidate for being a utility if you refactor it to
pass a threadpool to `Tasks`, but it may make more sense to leave it as an
instance method if it needs to update a merged DV cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]