aokolnychyi commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2742750434


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +199,526 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(Long 
fromSnapshotIdExclusive) {
+    if (fromSnapshotIdExclusive == null) {
+      return DeleteFileIndex.emptyIndex();
+    }
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+        fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Load delete files from manifests
+    Iterable<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Wrapper method that tracks build calls and caches the result for reuse. 
This ensures we only
+   * build the index once even if called from multiple places.
+   */
+  private DeleteFileIndex buildExistingDeleteIndexTracked(Long 
fromSnapshotIdExclusive) {
+    if (cachedExistingDeleteIndex != null) {
+      return cachedExistingDeleteIndex;
+    }
+    existingDeleteIndexBuildCallCount++;
+    cachedExistingDeleteIndex = 
buildExistingDeleteIndex(fromSnapshotIdExclusive);
+    return cachedExistingDeleteIndex;
+  }
+
+  // Visible for testing
+  int getExistingDeleteIndexBuildCallCount() {
+    return existingDeleteIndexBuildCallCount;
+  }
+
+  // Visible for testing
+  boolean wasExistingDeleteIndexBuilt() {
+    return existingDeleteIndexBuildCallCount > 0;
+  }
+
+  /**
+   * Builds per-snapshot delete file indexes for newly added delete files in 
each changelog
+   * snapshot. These deletes should generate DELETE changelog rows. Uses 
caching to avoid re-parsing
+   * manifests.
+   */
+  private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot> 
changelogSnapshots) {
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = 
Maps.newConcurrentMap();
+    Tasks.foreach(changelogSnapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutor())
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to build delete index for snapshot {}", 
snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              List<ManifestFile> snapshotDeleteManifests = 
snapshot.deleteManifests(table().io());
+              if (snapshotDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+                return;
+              }
+
+              // Filter to only include delete files added in this snapshot
+              List<ManifestFile> addedDeleteManifests =
+                  snapshotDeleteManifests.stream()
+                      .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+                      .collect(Collectors.toUnmodifiableList());
+
+              if (addedDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+              } else {
+                // Load delete files from manifests
+                Iterable<DeleteFile> deleteFiles = 
loadDeleteFiles(addedDeleteManifests);

Review Comment:
   Will this read/index all live delete entries from new delete manifests? How 
does this handle the case when the new delete manifest has a mix of existing 
and new delete files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to