pvary commented on code in PR #14264:
URL: https://github.com/apache/iceberg/pull/14264#discussion_r2445450485
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -133,13 +158,473 @@ private static Map<Long, Integer>
computeSnapshotOrdinals(Deque<Snapshot> snapsh
return snapshotOrdinals;
}
+ /**
+ * Builds a delete file index for existing deletes that were present before
the start snapshot.
+ * These deletes should be applied to data files but should not generate
DELETE changelog rows.
+ * Uses manifest pruning and caching to optimize performance.
+ */
+ private DeleteFileIndex buildExistingDeleteIndex(
+ Long fromSnapshotIdExclusive, Map<Long, DeleteFileIndex>
addedDeletesBySnapshot) {
+ if (fromSnapshotIdExclusive == null) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Check if we need existingDeleteIndex for equality deletes
+ boolean needsExistingDeleteIndex = false;
+
+ for (DeleteFileIndex addedDeleteIndex : addedDeletesBySnapshot.values()) {
+ if (!addedDeleteIndex.isEmpty()) {
+ // Check if this snapshot has equality deletes
+ for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+ if (df.content() == FileContent.EQUALITY_DELETES) {
+ needsExistingDeleteIndex = true;
+ break;
+ }
+ }
+ if (needsExistingDeleteIndex) {
+ break;
+ }
+ }
+ }
+
+ if (!needsExistingDeleteIndex) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+ Preconditions.checkState(
+ fromSnapshot != null, "Cannot find starting snapshot: %s",
fromSnapshotIdExclusive);
+
+ List<ManifestFile> existingDeleteManifests =
fromSnapshot.deleteManifests(table().io());
+ if (existingDeleteManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Prune manifests based on partition filter to avoid processing
irrelevant manifests
+ List<ManifestFile> prunedManifests =
pruneManifestsByPartition(existingDeleteManifests);
+ if (prunedManifests.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ // Load delete files from manifests
+ List<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests);
+
+ return DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Builds per-snapshot delete file indexes for newly added delete files in
each changelog
+ * snapshot. These deletes should generate DELETE changelog rows. Uses
caching to avoid re-parsing
+ * manifests.
+ */
+ private Map<Long, DeleteFileIndex> buildAddedDeleteIndexes(Deque<Snapshot>
changelogSnapshots) {
+ Map<Long, DeleteFileIndex> addedDeletesBySnapshot = Maps.newHashMap();
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ List<ManifestFile> snapshotDeleteManifests =
snapshot.deleteManifests(table().io());
+ if (snapshotDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(
+ snapshot.snapshotId(),
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+ continue;
+ }
+
+ // Filter to only include delete files added in this snapshot
+ List<ManifestFile> addedDeleteManifests =
+ FluentIterable.from(snapshotDeleteManifests)
+ .filter(manifest ->
manifest.snapshotId().equals(snapshot.snapshotId()))
+ .toList();
+
+ if (addedDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(
+ snapshot.snapshotId(),
DeleteFileIndex.builderFor(ImmutableList.of()).build());
+ } else {
+ // Load delete files from manifests
+ List<DeleteFile> deleteFiles = loadDeleteFiles(addedDeleteManifests);
+
+ DeleteFileIndex index =
+ DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+ }
+ }
+
+ return addedDeletesBySnapshot;
+ }
+
+ /**
+ * Plans tasks for EXISTING data files that are affected by newly added
delete files. These files
+ * were not added or deleted in the changelog snapshot range, but have new
delete files applied to
+ * them.
+ */
+ private CloseableIterable<ChangelogScanTask> planDeletedRowsTasks(
+ Deque<Snapshot> changelogSnapshots,
+ DeleteFileIndex existingDeleteIndex,
+ Map<Long, DeleteFileIndex> addedDeletesBySnapshot,
+ Set<Long> changelogSnapshotIds) {
+
+ Map<Long, Integer> snapshotOrdinals =
computeSnapshotOrdinals(changelogSnapshots);
+ List<ChangelogScanTask> tasks = Lists.newArrayList();
+
+ // Build a map of file statuses for each snapshot
+ Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
+ buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds);
+
+ // Process snapshots in order, tracking which files have been handled
+ Set<String> alreadyProcessedPaths = Sets.newHashSet();
+
+ // Accumulate actual DeleteFile entries chronologically
+ List<DeleteFile> accumulatedDeletes = Lists.newArrayList();
+
+ // Start with deletes from before the changelog range
+ // Apply partition pruning to only accumulate relevant delete files
+ if (!existingDeleteIndex.isEmpty()) {
+ for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) {
+ if (partitionMatchesFilter(df)) {
+ accumulatedDeletes.add(df);
+ }
+ }
+ }
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ DeleteFileIndex addedDeleteIndex =
addedDeletesBySnapshot.get(snapshot.snapshotId());
+ if (addedDeleteIndex.isEmpty()) {
+ continue;
+ }
+
+ // Build cumulative delete index for this snapshot from accumulated
deletes
+ DeleteFileIndex cumulativeDeleteIndex =
buildCumulativeDeleteIndex(accumulatedDeletes);
+
+ // Process data files for this snapshot
+ processSnapshotForDeletedRowsTasks(
+ snapshot,
+ addedDeleteIndex,
+ cumulativeDeleteIndex,
+ fileStatusBySnapshot.get(snapshot.snapshotId()),
+ alreadyProcessedPaths,
+ snapshotOrdinals,
+ tasks);
+
+ // Accumulate this snapshot's added deletes for subsequent snapshots
+ // Apply partition pruning to only accumulate relevant delete files
+ for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) {
+ if (partitionMatchesFilter(df)) {
+ accumulatedDeletes.add(df);
+ }
+ }
+ }
+
+ return CloseableIterable.withNoopClose(tasks);
+ }
+
+ /**
+ * Builds a map of file statuses for each snapshot, tracking which files
were added or deleted in
+ * each snapshot.
+ */
+ private Map<Long, Map<String, ManifestEntry.Status>>
buildFileStatusBySnapshot(
+ Deque<Snapshot> changelogSnapshots, Set<Long> changelogSnapshotIds) {
+
+ Map<Long, Map<String, ManifestEntry.Status>> fileStatusBySnapshot =
Maps.newHashMap();
+
+ for (Snapshot snapshot : changelogSnapshots) {
+ Map<String, ManifestEntry.Status> fileStatuses = Maps.newHashMap();
+
+ List<ManifestFile> changedDataManifests =
+ FluentIterable.from(snapshot.dataManifests(table().io()))
+ .filter(manifest ->
manifest.snapshotId().equals(snapshot.snapshotId()))
+ .toList();
+
+ ManifestGroup changedGroup =
+ new ManifestGroup(table().io(), changedDataManifests,
ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .ignoreExisting()
+ .columnsToKeepStats(columnsToKeepStats());
+
+ try (CloseableIterable<ManifestEntry<DataFile>> entries =
changedGroup.entries()) {
+ for (ManifestEntry<DataFile> entry : entries) {
+ if (changelogSnapshotIds.contains(entry.snapshotId())) {
+ fileStatuses.put(entry.file().location(), entry.status());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect file statuses", e);
+ }
+
+ fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses);
+ }
+
+ return fileStatusBySnapshot;
+ }
+
+ /** Builds a cumulative delete index from the accumulated list of delete
files. */
+ private DeleteFileIndex buildCumulativeDeleteIndex(List<DeleteFile>
accumulatedDeletes) {
+ if (accumulatedDeletes.isEmpty()) {
+ return DeleteFileIndex.builderFor(ImmutableList.of()).build();
+ }
+
+ return DeleteFileIndex.builderFor(accumulatedDeletes)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Processes data files for a snapshot to create DeletedRowsScanTask for
existing files affected
+ * by new delete files.
+ */
+ private void processSnapshotForDeletedRowsTasks(
+ Snapshot snapshot,
+ DeleteFileIndex addedDeleteIndex,
+ DeleteFileIndex cumulativeDeleteIndex,
+ Map<String, ManifestEntry.Status> currentSnapshotFiles,
+ Set<String> alreadyProcessedPaths,
+ Map<Long, Integer> snapshotOrdinals,
+ List<ChangelogScanTask> tasks) {
+
+ // Get all data files that exist in this snapshot
+ List<ManifestFile> allDataManifests = snapshot.dataManifests(table().io());
+ ManifestGroup allDataGroup =
+ new ManifestGroup(table().io(), allDataManifests, ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .ignoreDeleted()
+ .columnsToKeepStats(columnsToKeepStats());
+
+ if (shouldIgnoreResiduals()) {
+ allDataGroup = allDataGroup.ignoreResiduals();
+ }
+
+ try (CloseableIterable<ManifestEntry<DataFile>> entries =
allDataGroup.entries()) {
+ for (ManifestEntry<DataFile> entry : entries) {
+ DataFile dataFile = entry.file();
+ String filePath = dataFile.location();
+
+ // Skip if this file was ADDED or DELETED in this snapshot
+ // (those are handled by CreateDataFileChangeTasks)
+ if (currentSnapshotFiles.containsKey(filePath)) {
+ continue;
+ }
+
+ // Skip if we already created a task for this file in this snapshot
+ String key = snapshot.snapshotId() + ":" + filePath;
Review Comment:
Why do we need the snapshotId in the key?
Can't this be local for this method and only contain filenames?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]