gemini-code-assist[bot] commented on code in PR #38823:
URL: https://github.com/apache/beam/pull/38823#discussion_r3359276736


##########
sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied over from <a 
href="https://github.com/apache/iceberg/pull/14264/";>Iceberg PR #14264</a>.
+ */
+@SuppressWarnings("nullness")
+public class BaseIncrementalChangelogScan
+  extends BaseIncrementalScan<
+  IncrementalChangelogScan, ChangelogScanTask, 
ScanTaskGroup<ChangelogScanTask>>
+  implements IncrementalChangelogScan {
+  private static final DeleteFileIndex EMPTY = createEmptyInstance();
+
+  private static DeleteFileIndex createEmptyInstance() {
+    try {
+      var constructor =
+        DeleteFileIndex.class.getDeclaredConstructor(
+          DeleteFileIndex.EqualityDeletes.class,
+          PartitionMap.class,
+          PartitionMap.class,
+          Map.class,
+          Map.class);
+      constructor.setAccessible(true);
+      return constructor.newInstance(null, null, null, null, null);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize EMPTY DeleteFileIndex", 
e);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseIncrementalChangelogScan.class);
+
+  public BaseIncrementalChangelogScan(Table table) {
+    this(table, table.schema(), TableScanContext.empty());
+  }
+
+  private BaseIncrementalChangelogScan(Table table, Schema schema, 
TableScanContext context) {
+    super(table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+    Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newTable, newSchema, newContext);
+  }
+
+  // Private fields to track build call count and cache (accessed via 
package-private methods for
+  // testing)
+  private int existingDeleteIndexBuildCallCount = 0;
+  // Cache for the built index (null if not built yet)
+  private DeleteFileIndex cachedExistingDeleteIndex = null;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The fields `existingDeleteIndexBuildCallCount` and 
`cachedExistingDeleteIndex` are accessed and modified concurrently during 
parallel manifest planning (via `existingDeleteIndexSupplier` in 
`CreateDataFileChangeTasks`). To prevent data races and ensure proper memory 
visibility across threads, these fields must be declared `volatile`.
   
   ```suggestion
     private volatile int existingDeleteIndexBuildCallCount = 0;
     // Cache for the built index (null if not built yet)
     private volatile DeleteFileIndex cachedExistingDeleteIndex = null;
   ```



##########
sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied over from <a 
href="https://github.com/apache/iceberg/pull/14264/";>Iceberg PR #14264</a>.
+ */
+@SuppressWarnings("nullness")
+public class BaseIncrementalChangelogScan
+  extends BaseIncrementalScan<
+  IncrementalChangelogScan, ChangelogScanTask, 
ScanTaskGroup<ChangelogScanTask>>
+  implements IncrementalChangelogScan {
+  private static final DeleteFileIndex EMPTY = createEmptyInstance();
+
+  private static DeleteFileIndex createEmptyInstance() {
+    try {
+      var constructor =
+        DeleteFileIndex.class.getDeclaredConstructor(
+          DeleteFileIndex.EqualityDeletes.class,
+          PartitionMap.class,
+          PartitionMap.class,
+          Map.class,
+          Map.class);
+      constructor.setAccessible(true);
+      return constructor.newInstance(null, null, null, null, null);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize EMPTY DeleteFileIndex", 
e);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseIncrementalChangelogScan.class);
+
+  public BaseIncrementalChangelogScan(Table table) {
+    this(table, table.schema(), TableScanContext.empty());
+  }
+
+  private BaseIncrementalChangelogScan(Table table, Schema schema, 
TableScanContext context) {
+    super(table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+    Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newTable, newSchema, newContext);
+  }
+
+  // Private fields to track build call count and cache (accessed via 
package-private methods for
+  // testing)
+  private int existingDeleteIndexBuildCallCount = 0;
+  // Cache for the built index (null if not built yet)
+  private DeleteFileIndex cachedExistingDeleteIndex = null;
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+    Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+      orderedChangelogSnapshots(fromSnapshotIdExclusive, 
toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+      FluentIterable.from(changelogSnapshots)
+        .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+        .filter(manifest -> 
changelogSnapshotIds.contains(manifest.snapshotId()))
+        .toSet();
+
+    // Build per-snapshot delete file indexes for added deletes
+    Map<Long, DeleteFileIndex> addedDeletesBySnapshot = 
buildAddedDeleteIndexes(changelogSnapshots);
+
+    // Check if existing delete index is needed for equality deletes
+    boolean hasEqualityDeletes =
+      addedDeletesBySnapshot.values().stream()
+        .anyMatch(index -> !index.isEmpty() && index.hasEqualityDeletes());
+
+    // Build existing index early if needed for equality deletes, otherwise 
use lazy initialization
+    DeleteFileIndex existingDeleteIndex =
+      hasEqualityDeletes ? 
buildExistingDeleteIndexTracked(fromSnapshotIdExclusive) : EMPTY;
+
+    ManifestGroup manifestGroup =
+      new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .select(scanColumns())
+        .filterData(filter())
+        .filterManifestEntries(entry -> 
changelogSnapshotIds.contains(entry.snapshotId()))
+        .ignoreExisting()
+        .columnsToKeepStats(columnsToKeepStats());
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    // Create a supplier that reuses already-built index or builds lazily when 
first DELETED entry
+    // is encountered
+    Supplier<DeleteFileIndex> existingDeleteIndexSupplier =
+      () -> {
+        if (cachedExistingDeleteIndex != null) {
+          return cachedExistingDeleteIndex;
+        }
+        return buildExistingDeleteIndexTracked(fromSnapshotIdExclusive);
+      };
+
+    // Plan data file tasks (ADDED and DELETED)
+    Map<Long, List<DeleteFile>> cumulativeDeletesMap =
+      buildCumulativeDeletesBySnapshot(changelogSnapshots, 
addedDeletesBySnapshot);
+
+    CloseableIterable<ChangelogScanTask> dataFileTasks =
+      manifestGroup.plan(
+        new CreateDataFileChangeTasks(
+          changelogSnapshots,
+          existingDeleteIndexSupplier,
+          addedDeletesBySnapshot,
+          cumulativeDeletesMap,
+          table().specs(),
+          isCaseSensitive()));
+
+    // Find EXISTING data files affected by newly added delete files and 
create tasks for them
+    CloseableIterable<ChangelogScanTask> deletedRowsTasks =
+      planDeletedRowsTasks(
+        changelogSnapshots, existingDeleteIndex, addedDeletesBySnapshot, 
changelogSnapshotIds);
+
+    // Merge tasks from both iterables in order by changeOrdinal
+    Comparator<ChangelogScanTask> byOrdinal =
+      Comparator.comparing(ChangelogScanTask::changeOrdinal)
+        .thenComparing(ChangelogScanTask::commitSnapshotId);
+
+    return new SortedMerge<>(byOrdinal, ImmutableList.of(dataFileTasks, 
deletedRowsTasks));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+      planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change 
ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long 
toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, 
fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return 
snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> 
snapshots) {
+    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+
+    int ordinal = 0;
+
+    for (Snapshot snapshot : snapshots) {
+      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+    }
+
+    return snapshotOrdinals;
+  }
+
+  /**
+   * Builds a delete file index for existing deletes that were present before 
the start snapshot.
+   * These deletes should be applied to data files but should not generate 
DELETE changelog rows.
+   * Uses manifest pruning and caching to optimize performance.
+   */
+  private DeleteFileIndex buildExistingDeleteIndex(Long 
fromSnapshotIdExclusive) {
+    if (fromSnapshotIdExclusive == null) {
+      return EMPTY;
+    }
+    Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+    Preconditions.checkState(
+      fromSnapshot != null, "Cannot find starting snapshot: %s", 
fromSnapshotIdExclusive);
+
+    List<ManifestFile> existingDeleteManifests = 
fromSnapshot.deleteManifests(table().io());
+    if (existingDeleteManifests.isEmpty()) {
+      return EMPTY;
+    }
+
+    // Prune manifests based on partition filter to avoid processing 
irrelevant manifests
+    List<ManifestFile> prunedManifests = 
pruneManifestsByPartition(existingDeleteManifests);
+    if (prunedManifests.isEmpty()) {
+      return EMPTY;
+    }
+
+    // Load delete files from manifests
+    Iterable<DeleteFile> deleteFiles = loadDeleteFiles(prunedManifests, null);
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+      .specsById(table().specs())
+      .caseSensitive(isCaseSensitive())
+      .build();
+  }
+
+  /**
+   * Wrapper method that tracks build calls and caches the result for reuse. 
This ensures we only
+   * build the index once even if called from multiple places.
+   */
+  private DeleteFileIndex buildExistingDeleteIndexTracked(Long 
fromSnapshotIdExclusive) {
+    if (cachedExistingDeleteIndex != null) {
+      return cachedExistingDeleteIndex;
+    }
+    existingDeleteIndexBuildCallCount++;
+    cachedExistingDeleteIndex = 
buildExistingDeleteIndex(fromSnapshotIdExclusive);
+    return cachedExistingDeleteIndex;
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `buildExistingDeleteIndexTracked` method is called concurrently by 
multiple threads during parallel manifest planning. Without synchronization, 
multiple threads can execute `buildExistingDeleteIndex` simultaneously, leading 
to redundant work, potential thread-safety issues, and incorrect call counts. 
Synchronizing this method completes the double-checked locking pattern 
initiated by `existingDeleteIndexSupplier`.
   
   ```suggestion
     private synchronized DeleteFileIndex buildExistingDeleteIndexTracked(Long 
fromSnapshotIdExclusive) {
       if (cachedExistingDeleteIndex != null) {
         return cachedExistingDeleteIndex;
       }
       existingDeleteIndexBuildCallCount++;
       cachedExistingDeleteIndex = 
buildExistingDeleteIndex(fromSnapshotIdExclusive);
       return cachedExistingDeleteIndex;
     }
   ```



##########
sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied over from <a 
href="https://github.com/apache/iceberg/pull/14264/";>Iceberg PR #14264</a>.
+ */
+@SuppressWarnings("nullness")
+public class BaseIncrementalChangelogScan
+  extends BaseIncrementalScan<
+  IncrementalChangelogScan, ChangelogScanTask, 
ScanTaskGroup<ChangelogScanTask>>
+  implements IncrementalChangelogScan {
+  private static final DeleteFileIndex EMPTY = createEmptyInstance();
+
+  private static DeleteFileIndex createEmptyInstance() {
+    try {
+      var constructor =
+        DeleteFileIndex.class.getDeclaredConstructor(
+          DeleteFileIndex.EqualityDeletes.class,
+          PartitionMap.class,
+          PartitionMap.class,
+          Map.class,
+          Map.class);
+      constructor.setAccessible(true);
+      return constructor.newInstance(null, null, null, null, null);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize EMPTY DeleteFileIndex", 
e);
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using reflection to instantiate `DeleteFileIndex` is highly fragile and can 
easily break when the Iceberg library is upgraded or if the JVM enforces strong 
encapsulation (e.g., under Java 17+). Since `DeleteFileIndex` provides a public 
builder, we can safely construct an empty instance using 
`DeleteFileIndex.builderFor(ImmutableList.of()).build()` without any reflection.
   
   ```suggestion
     private static final DeleteFileIndex EMPTY = 
DeleteFileIndex.builderFor(ImmutableList.of()).build();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to