amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2729978590
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -265,15 +268,14 @@ private void addInternal(DeleteFile file) {
"Cannot find partition spec %s for delete file: %s",
file.specId(),
file.location());
-
- DeleteFileSet deleteFiles =
- newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored ->
DeleteFileSet.create());
- if (deleteFiles.add(file)) {
- addedFilesSummary.addedFile(spec, file);
- hasNewDeleteFiles = true;
- if (ContentFileUtil.isDV(file)) {
- newDVRefs.add(file.referencedDataFile());
- }
+ hasNewDeleteFiles = true;
Review Comment:
Since we're not tracking by DeleteFileSet at the time of adding, we treat
every addition as a new delete (unless we want to do a look back in the list on
every addDeleteFile, but I'm very against that since it's an O(deletes-added^2)
operation effectively at that point for a commit).
If we look at how `hasNewDeleteFiles` is actually used, I don't think this
is really consequential. hasNewDeleteFiles is true and there's a cached state
we use the flag as an indication that the cache is stale, and should be cleared
out/files cleaned up. Even if there are duplicates, there's at least 1 file
which is new.
We end up merging/deduping the DVs (and the V2 pos deletes and equality
deletes) anyways just before producing new manifests. See my comment below
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -847,7 +859,7 @@ private void validateAddedDVs(
DeleteFile file = entry.file();
if (newSnapshotIds.contains(entry.snapshotId()) &&
ContentFileUtil.isDV(file)) {
ValidationException.check(
- !newDVRefs.contains(file.referencedDataFile()),
+ !dvsByReferencedFile.containsKey(file.referencedDataFile()),
Review Comment:
Yeah had an old PR out for this
https://github.com/apache/iceberg/pull/11693/files#diff-410ff1b47d9a44a2fd5dbd103cad9463d82c8f4f51aa1be63b8b403123ab6e0e
(probably a bad PR title since by definition for the operation if the
positions are disjoint, it's not conflicting)
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1060,9 +1062,36 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
}
if (cachedNewDeleteManifests.isEmpty()) {
+ Map<String, List<DeleteFile>> duplicateDVs = Maps.newHashMap();
+ List<DeleteFile> validDVs = Lists.newArrayList();
+ for (Map.Entry<String, List<DeleteFile>> entry :
dvsByReferencedFile.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ duplicateDVs.put(entry.getKey(), entry.getValue());
+ } else {
+ validDVs.addAll(entry.getValue());
+ }
+ }
+
+ List<DeleteFile> mergedDVs =
+ duplicateDVs.isEmpty()
+ ? ImmutableList.of()
+ : DVUtil.mergeDVsAndWrite(
+ ops(),
+ duplicateDVs,
+ tableName,
+ ops().current().specsById(),
+ ThreadPools.getDeleteWorkerPool());
+ // Prevent commiting duplicate V2 deletes by deduping them
+ Map<Integer, List<DeleteFile>> newDeleteFilesBySpec =
+ Streams.stream(
+ Iterables.concat(
+ mergedDVs, validDVs,
DeleteFileSet.of(positionAndEqualityDeletes)))
Review Comment:
@rdblue let me know how you feel about the
`DeleteFileSet.of(positionandEqualityDeletes)`.
I know we were kind of against de-duping but I think the fact that the two
fields are disjoint now avoids that partition spec case you mentioned. I'm a
bit worried that not deduping before producing the manifests is a regression
compared to the previous behavior. And there's a good argument that if we can
do it correctly, relatively cheaply, it's better to do it to avoid any bad
metadata (similar to why we do it for data files).
The summary stats are anyways produced from this "final" deleteFilesBySpec
which should be all correct so I think we're covered in general.
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -86,8 +89,8 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
// update data
private final Map<Integer, DataFileSet> newDataFilesBySpec =
Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
- private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec =
Maps.newHashMap();
- private final Set<String> newDVRefs = Sets.newHashSet();
+ private final List<DeleteFile> positionAndEqualityDeletes =
Lists.newArrayList();
+ private final Map<String, List<DeleteFile>> dvsByReferencedFile =
Maps.newLinkedHashMap();
Review Comment:
@rdblue These are 2 disjoint fields, one for a list of v2 deletes and a
multimap for DVs.
The map is a `LinkedHashMap` because we have a bunch of tests which have
expectations on the exact orders of entries in a manifest. The previous change
didn't require anything because we worked with the deleteFilesBySpec, and
inherently preserved the order.
I personally think our tests should probably get away from expecting a
certain order in manifests, and just assert the contents (or at least have
validate methods that express either being strict on the ordering or not). As
we get into V4, maybe we'll make implementation choices for ordering entries in
a certain way but in the current state of things, it was kind of a hinderance
to making changes here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]