aokolnychyi commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2742765660


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +199,526 @@ private static Map<Long, Integer> 
computeSnapshotOrdinals(Deque<Snapshot> snapsh
     return snapshotOrdinals;
   }
 
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(Long 
fromSnapshotIdExclusive) {
+    if (fromSnapshotIdExclusive == null) {
+      return DeleteFileIndex.emptyIndex();
+    }
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+        fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    // Load delete files from manifests
+    Iterable<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Wrapper method that tracks build calls and caches the result for reuse. 
This ensures we only
+   * build the index once even if called from multiple places.
+   */
+  private DeleteFileIndex buildExistingDeleteIndexTracked(Long 
fromSnapshotIdExclusive) {
+    if (cachedExistingDeleteIndex != null) {
+      return cachedExistingDeleteIndex;
+    }
+    existingDeleteIndexBuildCallCount++;
+    cachedExistingDeleteIndex = 
buildExistingDeleteIndex(fromSnapshotIdExclusive);
+    return cachedExistingDeleteIndex;
+  }
+
+  // Visible for testing
+  int getExistingDeleteIndexBuildCallCount() {
+    return existingDeleteIndexBuildCallCount;
+  }
+
+  // Visible for testing
+  boolean wasExistingDeleteIndexBuilt() {
+    return existingDeleteIndexBuildCallCount > 0;
+  }
+
+  /**
+   * Builds per-snapshot delete file indexes for newly added delete files in 
each changelog
+   * snapshot. These deletes should generate DELETE changelog rows. Uses 
caching to avoid re-parsing
+   * manifests.
+   */
+  private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot> 
changelogSnapshots) {
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = 
Maps.newConcurrentMap();
+    Tasks.foreach(changelogSnapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutor())
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to build delete index for snapshot {}", 
snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              List<ManifestFile> snapshotDeleteManifests = 
snapshot.deleteManifests(table().io());
+              if (snapshotDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+                return;
+              }
+
+              // Filter to only include delete files added in this snapshot
+              List<ManifestFile> addedDeleteManifests =
+                  snapshotDeleteManifests.stream()
+                      .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+                      .collect(Collectors.toUnmodifiableList());
+
+              if (addedDeleteManifests.isEmpty()) {
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), 
DeleteFileIndex.emptyIndex());
+              } else {
+                // Load delete files from manifests
+                Iterable<DeleteFile> deleteFiles = 
loadDeleteFiles(addedDeleteManifests);
+
+                DeleteFileIndex index =
+                    DeleteFileIndex.builderFor(deleteFiles)
+                        .specsById(table().specs())
+                        .caseSensitive(isCaseSensitive())
+                        .build();
+                addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+              }
+            });
+    return addedDeletesBySnapshot;
+  }
+
+  /**
+   * Plans tasks for EXISTING data files that are affected by newly added 
delete files. These files
+   * were not added or deleted in the changelog snapshot range, but have new 
delete files applied to
+   * them.
+   */
+  private CloseableIterable<ChangelogScanTask> planDeletedRowsTasks(
+      Deque<Snapshot> changelogSnapshots,
+      DeleteFileIndex existingDeleteIndex,
+      Map<Long, DeleteFileIndex> addedDeletesBySnapshot,
+      Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Integer> snapshotOrdinals = 
computeSnapshotOrdinals(changelogSnapshots);
+    List<ChangelogScanTask> tasks = Lists.newArrayList();
+
+    // Build a map of file statuses for each snapshot
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
+        buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds);
+
+    // Accumulate actual DeleteFile entries chronologically
+    List<DeleteFile> accumulatedDeletes = Lists.newArrayList();
+
+    // Start with deletes from before the changelog range
+    if (!existingDeleteIndex.isEmpty()) {
+      for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) {
+        accumulatedDeletes.add(df);
+      }
+    }
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      DeleteFileIndex addedDeleteIndex = 
addedDeletesBySnapshot.get(snapshot.snapshotId());
+      if (addedDeleteIndex.isEmpty()) {
+        continue;
+      }
+
+      DeleteFileIndex cumulativeDeleteIndex = 
buildDeleteIndex(accumulatedDeletes);
+
+      // Process data files for this snapshot
+      // Use a local set per snapshot to track processed files
+      Set<String> alreadyProcessedPaths = Sets.newHashSet();
+      processSnapshotForDeletedRowsTasks(
+          snapshot,
+          addedDeleteIndex,
+          cumulativeDeleteIndex,
+          fileStatusBySnapshot.get(snapshot.snapshotId()),
+          alreadyProcessedPaths,
+          snapshotOrdinals,
+          tasks);
+
+      // Accumulate this snapshot's added deletes for subsequent snapshots
+      for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+        accumulatedDeletes.add(df);
+      }
+    }
+
+    return CloseableIterable.withNoopClose(tasks);
+  }
+
+  /**
+   * Builds a map of file statuses for each snapshot, tracking which files 
were added or deleted in
+   * each snapshot.
+   */
+  private Map<Long, Map<String, ManifestEntry.Status>> 
buildFileStatusBySnapshot(
+      Deque<Snapshot> changelogSnapshots, Set<Long> changelogSnapshotIds) {
+
+    Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot = 
Maps.newHashMap();
+
+    for (Snapshot snapshot : changelogSnapshots) {
+      Map<String, ManifestEntry.Status> fileStatuses = Maps.newHashMap();
+
+      List<ManifestFile> changedDataManifests =
+          FluentIterable.from(snapshot.dataManifests(table().io()))
+              .filter(manifest -> 
manifest.snapshotId().equals(snapshot.snapshotId()))
+              .toList();
+
+      ManifestGroup changedGroup =
+          new ManifestGroup(table().io(), changedDataManifests, 
ImmutableList.of())
+              .specsById(table().specs())
+              .caseSensitive(isCaseSensitive())
+              .select(scanColumns())
+              .filterData(filter())
+              .ignoreExisting()
+              .columnsToKeepStats(columnsToKeepStats());
+
+      try (CloseableIterable<ManifestEntry<DataFile>> entries = 
changedGroup.entries()) {
+        for (ManifestEntry<DataFile> entry : entries) {
+          if (changelogSnapshotIds.contains(entry.snapshotId())) {
+            fileStatuses.put(entry.file().location(), entry.status());
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to collect file statuses", e);
+      }
+
+      fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses);
+    }
+
+    return fileStatusBySnapshot;
+  }
+
+  /** Builds a delete index from the accumulated list of delete files. */
+  private DeleteFileIndex buildDeleteIndex(List<DeleteFile> 
accumulatedDeletes) {
+    if (accumulatedDeletes.isEmpty()) {
+      return DeleteFileIndex.emptyIndex();
+    }
+
+    return DeleteFileIndex.builderFor(accumulatedDeletes)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .build();
+  }
+
+  /**
+   * Processes data files for a snapshot to create DeletedRowsScanTask for 
existing files affected
+   * by new delete files.
+   */
+  private void processSnapshotForDeletedRowsTasks(
+      Snapshot snapshot,
+      DeleteFileIndex addedDeleteIndex,
+      DeleteFileIndex cumulativeDeleteIndex,
+      Map<String, ManifestEntry.Status> currentSnapshotFiles,
+      Set<String> alreadyProcessedPaths,
+      Map<Long, Integer> snapshotOrdinals,
+      List<ChangelogScanTask> tasks) {
+
+    // Get all data files that exist in this snapshot
+    List<ManifestFile> allDataManifests = snapshot.dataManifests(table().io());
+    ManifestGroup allDataGroup =
+        new ManifestGroup(table().io(), allDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .ignoreDeleted()
+            .columnsToKeepStats(columnsToKeepStats());
+
+    if (shouldIgnoreResiduals()) {
+      allDataGroup = allDataGroup.ignoreResiduals();
+    }
+
+    String schemaString = SchemaParser.toJson(schema());
+
+    // Cache per specId - same for all files with same specId
+    Map<Integer, String> specStringCache = Maps.newHashMap();
+    Map<Integer, ResidualEvaluator> residualCache = Maps.newHashMap();
+    Expression residualFilter = shouldIgnoreResiduals() ? 
Expressions.alwaysTrue() : filter();
+
+    try (CloseableIterable<ManifestEntry<DataFile>> entries = 
allDataGroup.entries()) {
+      for (ManifestEntry<DataFile> entry : entries) {
+        DataFile dataFile = entry.file();
+        String filePath = dataFile.location();
+
+        // Skip if this file was ADDED or DELETED in this snapshot
+        // (those are handled by CreateDataFileChangeTasks)
+        if (currentSnapshotFiles.containsKey(filePath)) {
+          continue;
+        }
+
+        // Skip if we already created a task for this file in this snapshot
+        // Note: alreadyProcessedPaths is local to this snapshot's processing
+        if (alreadyProcessedPaths.contains(filePath)) {
+          continue;
+        }
+
+        // Check if this data file is affected by newly added delete files
+        DeleteFile[] addedDeletes = addedDeleteIndex.forEntry(entry);
+        if (addedDeletes.length == 0) {
+          continue;
+        }
+
+        // This data file was EXISTING but has new delete files applied
+        // Get existing deletes from before this snapshot (cumulative)
+        DeleteFile[] existingDeletes =
+            cumulativeDeleteIndex.isEmpty()
+                ? new DeleteFile[0]
+                : cumulativeDeleteIndex.forEntry(entry);
+
+        // Create a DeletedRowsScanTask
+        int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
+
+        // Use cached values (calculate once per specId)
+        int specId = dataFile.specId();
+        String specString =
+            specStringCache.computeIfAbsent(
+                specId, id -> 
PartitionSpecParser.toJson(table().specs().get(id)));
+        ResidualEvaluator residuals =
+            residualCache.computeIfAbsent(
+                specId,
+                id -> {
+                  PartitionSpec spec = table().specs().get(id);
+                  return ResidualEvaluator.of(spec, residualFilter, 
isCaseSensitive());
+                });
+
+        tasks.add(
+            new BaseDeletedRowsScanTask(
+                changeOrdinal,
+                snapshot.snapshotId(),
+                dataFile.copy(shouldKeepStats()),
+                addedDeletes,
+                existingDeletes,
+                schemaString,
+                specString,
+                residuals));
+
+        // Mark this file as processed for this snapshot
+        alreadyProcessedPaths.add(filePath);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to plan deleted rows tasks", e);
+    }
+  }
+
+  private boolean shouldKeepStats() {
+    Set<Integer> columns = columnsToKeepStats();
+    return columns != null && !columns.isEmpty();
+  }
+
+  /**
+   * Loads delete files from manifests by parsing each manifest.
+   *
+   * @param manifests the delete manifests to load
+   * @return list of delete files
+   */
+  private Iterable<DeleteFile> loadDeleteFiles(List<ManifestFile> manifests) {
+    Queue<DeleteFile> allDeleteFiles = new ConcurrentLinkedQueue<>();
+
+    Tasks.foreach(manifests)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutor())
+        .run(
+            manifest -> {
+              List<DeleteFile> deleteFiles = 
loadDeleteFilesFromManifest(manifest);
+              allDeleteFiles.addAll(deleteFiles);
+            });
+
+    return allDeleteFiles;
+  }
+
+  /**
+   * Prunes delete manifests based on partition filter to avoid processing 
irrelevant manifests.
+   * This significantly improves performance when only a subset of partitions 
are relevant to the
+   * scan.
+   *
+   * @param manifests all delete manifests to consider
+   * @return list of manifests that might contain relevant delete files
+   */
+  private List<ManifestFile> pruneManifestsByPartition(List<ManifestFile> 
manifests) {
+    Expression currentFilter = filter();
+
+    // If there's no filter, return all manifests
+    if (currentFilter == null || 
currentFilter.equals(Expressions.alwaysTrue())) {
+      return manifests;
+    }
+
+    List<ManifestFile> prunedManifests = Lists.newArrayList();
+
+    for (ManifestFile manifest : manifests) {
+      PartitionSpec spec = table().specs().get(manifest.partitionSpecId());
+      if (spec == null || spec.isUnpartitioned()) {
+        // Include unpartitioned manifests
+        prunedManifests.add(manifest);
+      } else if (manifestOverlapsFilter(manifest, spec, currentFilter)) {
+        // Check if manifest partition range overlaps with filter
+        prunedManifests.add(manifest);
+      }
+    }
+
+    return prunedManifests;
+  }
+
+  /**
+   * Checks if a manifest's partition range overlaps with the given filter.
+   *
+   * @param manifest the manifest to check
+   * @param spec the partition spec for the manifest
+   * @param filter the scan filter
+   * @return true if the manifest might contain matching partitions, false 
otherwise
+   */
+  private boolean manifestOverlapsFilter(
+      ManifestFile manifest, PartitionSpec spec, Expression filter) {
+    try {
+      // Use inclusive projection to transform row filter to partition filter
+      Expression partitionFilter = Projections.inclusive(spec, 
isCaseSensitive()).project(filter);
+
+      // Create evaluator for the partition filter
+      ManifestEvaluator evaluator =
+          ManifestEvaluator.forPartitionFilter(partitionFilter, spec, 
isCaseSensitive());
+
+      // Check if manifest could contain matching partitions
+      return evaluator.eval(manifest);
+    } catch (Exception e) {
+      // If evaluation fails, be conservative and include the manifest
+      return true;
+    }
+  }
+
+  /**
+   * Checks if a delete file's partition overlaps with the current scan 
filter. This enables
+   * partition pruning to reduce memory footprint and planning overhead by 
skipping delete files
+   * that cannot possibly match any rows in the scan.
+   *
+   * @param file the delete file to check
+   * @return true if the delete file's partition might contain matching rows, 
false otherwise
+   */
+  private boolean partitionMatchesFilter(DeleteFile file) {
+    // If there's no filter, all partitions match
+    Expression currentFilter = filter();
+    if (currentFilter == null || 
currentFilter.equals(Expressions.alwaysTrue())) {
+      return true;
+    }
+
+    // Get the partition spec for this delete file
+    PartitionSpec spec = table().specs().get(file.specId());
+    if (spec == null || spec.isUnpartitioned()) {
+      // If spec not found or table is unpartitioned, be conservative and 
include the file
+      return true;
+    }
+
+    try {
+      // Project the row filter to partition space using inclusive projection
+      // This transforms expressions on source columns to expressions on 
partition columns
+      Expression partitionFilter =
+          Projections.inclusive(spec, 
isCaseSensitive()).project(currentFilter);
+
+      // Evaluate the projected filter against the delete file's partition
+      Evaluator evaluator = new Evaluator(spec.partitionType(), 
partitionFilter, isCaseSensitive());
+      return evaluator.eval(file.partition());
+    } catch (Exception e) {
+      // If evaluation fails, be conservative and include the file
+      return true;
+    }
+  }
+
+  /**
+   * Loads delete files from a single manifest, parsing the manifest entries.
+   *
+   * @param manifest the delete manifest to load
+   * @return list of delete files from this manifest
+   */
+  private List<DeleteFile> loadDeleteFilesFromManifest(ManifestFile manifest) {
+    List<DeleteFile> deleteFiles = Lists.newArrayList();
+
+    try (ManifestReader<DeleteFile> reader =
+        ManifestFiles.readDeleteManifest(manifest, table().io(), 
table().specs())) {
+      for (ManifestEntry<DeleteFile> entry : reader.entries()) {
+        if (entry.status() != ManifestEntry.Status.DELETED) {
+          // Only include live delete files, copy with minimal stats to save 
memory
+          DeleteFile file = entry.file();
+
+          // Apply partition pruning - skip delete files that cannot match the 
scan filter
+          if (!partitionMatchesFilter(file)) {
+            continue;
+          }
+
+          Set<Integer> columns =
+              file.content() == FileContent.POSITION_DELETES
+                  ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId())
+                  : Set.copyOf(file.equalityFieldIds());
+          deleteFiles.add(ContentFileUtil.copy(file, true, columns));
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to read delete manifest: " + 
manifest.path(), e);
+    }
+
+    return deleteFiles;
+  }
+
   private static class CreateDataFileChangeTasks implements 
CreateTasksFunction<ChangelogScanTask> {
     private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
 
     private final Map<Long, Integer> snapshotOrdinals;
-
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+    private final Supplier<DeleteFileIndex> existingDeleteIndexSupplier;
+    private final Map<Long, DeleteFileIndex> addedDeletesBySnapshot;
+    private final Map<Long, List<DeleteFile>> cumulativeDeletesMap;

Review Comment:
   Can we have a situation in which we might assign phantom deletes as we never 
track delete files that got removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to