ahmedabu98 commented on code in PR #38836:
URL: https://github.com/apache/beam/pull/38836#discussion_r3476840191


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java:
##########
@@ -0,0 +1,1002 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static java.lang.String.format;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.Type.ADDED_ROWS;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getDataFile;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getLength;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getPartition;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getSpec;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getType;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.TableCache;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.BaseIncrementalChangelogScan;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.MetricsModes;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn that takes incoming Iceberg snapshots and scans them for changelogs 
using Iceberg's {@link
+ * IncrementalChangelogScan}. Changelog tasks are organized into batches and 
routed to different
+ * downstream PCollections based on complexity.
+ *
+ * <p>The Iceberg scan generates batches of changelog scan tasks, each of size 
{@link
+ * TableProperties#SPLIT_SIZE}. This can be configured with the table's <a
+ * 
href="https://iceberg.apache.org/docs/latest/configuration/#read-properties";>read.split.target-size
+ * property</a>.
+ *
+ * <p>This DoFn analyzes the nature of changes within the snapshot, partition, 
and file level, then
+ * routes the changes accordingly:
+ *
+ * <ol>
+ *   <li><b>Unidirectional (Fast Path):</b> If an isolated level contains only 
inserts OR only
+ *       deletes, its tasks are emitted to {@link #UNIDIRECTIONAL_TASKS}. 
These records <b>bypass
+ *       the CoGBK shuffle</b> and are output immediately.
+ *   <li><b>Small Bidirectional (Medium Path):</b> If an isolated level 
contains a mix of inserts
+ *       and deletes, and is small enough, its tasks are emitted to {@link
+ *       #SMALL_BIDIRECTIONAL_TASKS}. These records are resolved in memory to 
identify potential
+ *       updates. Task groups are considered small enough if the estimated 
overlap region is within
+ *       {@link TableProperties#SPLIT_SIZE}.
+ *   <li><b>Bidirectional (Slow Path):</b> If an isolated level contains a mix 
of inserts and
+ *       deletes, and is too large, its tasks are emitted to {@link 
#LARGE_BIDIRECTIONAL_TASKS}.
+ *       These records are grouped by Primary Key and processed by {@link 
ResolveChanges} to
+ *       identify potential updates.
+ * </ol>
+ *
+ * <h2>Optimizing by Shuffling Less Data</h2>
+ *
+ * <p>We take a three-layered approach to identify data that can bypass the 
expensive downstream
+ * CoGroupByKey shuffle:
+ *
+ * <h3>Snapshots</h3>
+ *
+ * We start by analyzing the nature of changes at the snapshot level. If a 
snapshot's operation is
+ * not of type {@link DataOperations#OVERWRITE}, then it's a uni-directional 
change.
+ *
+ * <h3>Pinned Partitions</h3>
+ *
+ * <p>If the table's partition fields are derived entirely from Primary Key 
fields, we know that a
+ * record will not migrate between partitions. This narrows down the isolated 
level and allows us to
+ * only check for bi-directional changes <b>within a partition</b>. Doing this 
will allow partitions
+ * with uni-directional changes to bypass the expensive CoGBK shuffle. It also 
gives partitions with
+ * small bi-directional changes a chance to be processed in-memory instead of 
needing to pass
+ * through the CoGBK.
+ *
+ * <h3>Optimization for Individual Files</h3>
+ *
+ * When we have narrowed down our group of tasks with bi-directional changes, 
we start analyzing the
+ * metadata of their underlying files. We compare the upper and lower bounds 
of Partition Keys
+ * relevant to each file, and consider any overlaps as potentially containing 
an update. If a given
+ * task's Primary Key bounds has no overlap with any opposing task's Primary 
Key bounds, then we
+ * know it's not possible to create an (insert, delete) pair with it. Such a 
task can safely bypass
+ * the shuffle.
+ *
+ * <p>Note: "opposing" refers to a change that happens in the opposite 
direction (e.g. insert is
+ * "positive", delete is "negative")
+ *
+ * <p>For example, say we have a group of tasks:
+ *
+ * <ol>
+ *   <li>Task A (adds rows): bounds [3, 8]
+ *   <li>Task B (adds rows): bounds [2, 4]
+ *   <li>Task C (deletes rows): bounds [1, 5]
+ *   <li>Task D (adds rows): bounds [6, 12]
+ * </ol>
+ *
+ * <p>Tasks A and B add rows, and overlap with Task C which deletes row. We 
need to resolve the rows
+ * in these 3 tasks because they might all contain (insert, delete) pairs that 
lead to an update.
+ *
+ * <p>Task D however, does not overlap with any delete rows. It will never 
produce an (insert,
+ * delete) pair, so we can directly emit it without resolving its output rows.
+ */
+class ChangelogScanner
+  extends DoFn<Long, KV<ChangelogDescriptor, List<SerializableChangelogTask>>> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogScanner.class);
+  private static final Counter totalChangelogScanTasks =
+    Metrics.counter(ChangelogScanner.class, "totalChangelogScanTasks");
+  private static final Counter numAddedRowsScanTasks =
+    Metrics.counter(ChangelogScanner.class, "numAddedRowsScanTasks");
+  private static final Counter numDeletedRowsScanTasks =
+    Metrics.counter(ChangelogScanner.class, "numDeletedRowsScanTasks");
+  private static final Counter numDeletedDataFileScanTasks =
+    Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks");
+  private static final Counter numUniDirectionalTasks =
+    Metrics.counter(ChangelogScanner.class, "numUniDirectionalTasks");
+  private static final Counter numLargeBiDirectionalTasks =
+    Metrics.counter(ChangelogScanner.class, "numLargeBiDirectionalTasks");
+  private static final Counter numSmallBiDirectionalTasks =
+    Metrics.counter(ChangelogScanner.class, "numSmallBiDirectionalTasks");
+  static final TupleTag<KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>>
+    UNIDIRECTIONAL_TASKS = new TupleTag<>();
+  static final TupleTag<KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>>
+    SMALL_BIDIRECTIONAL_TASKS = new TupleTag<>();
+  static final TupleTag<KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>>
+    LARGE_BIDIRECTIONAL_TASKS = new TupleTag<>();
+
+  private final IcebergScanConfig scanConfig;
+  private @MonotonicNonNull Table table;
+  private @MonotonicNonNull Snapshot snapshot;
+  private transient @MonotonicNonNull TaskBatcher uniBatcher;
+  private boolean canDoPartitionOptimization = false;
+  // for metrics
+  private int numAddedRowsTasks = 0;
+  private int numDeletedRowsTasks = 0;
+  private int numDeletedFileTasks = 0;
+  private int numUniDirTasks = 0;
+  private int numSmallBiDirTasks = 0;
+  private int numLargeBiDirTasks = 0;
+  private int numUniDirSplits = 0;
+  private int numSmallBiDirSplits = 0;
+  private int numLargeBiDirSplits = 0;
+
+  ChangelogScanner(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  static KvCoder<ChangelogDescriptor, List<SerializableChangelogTask>> coder(
+    org.apache.beam.sdk.schemas.Schema rowIdBeamSchema) {
+    return KvCoder.of(
+      ChangelogDescriptor.coder(rowIdBeamSchema),
+      ListCoder.of(SerializableChangelogTask.coder()));
+  }
+
+  @Setup
+  public void setup() {
+    TableCache.setup(scanConfig);
+  }
+
+  @ProcessElement
+  public void process(@Element Long snapshotId, MultiOutputReceiver out) 
throws IOException {
+    resetLocalMetrics();
+    // not using getRefreshed because upstream Watch should have already 
refreshed the
+    // table to a state where this snapshot exists
+    this.table = 
SerializableTable.copyOf(TableCache.get(scanConfig.getTableIdentifier()));
+    this.snapshot = table.snapshot(snapshotId);
+
+    // refresh on miss
+    if (this.snapshot == null) {
+      this.table =
+        
SerializableTable.copyOf(TableCache.getRefreshed(scanConfig.getTableIdentifier()));
+      this.snapshot =
+        checkStateNotNull(
+          table.snapshot(snapshotId), "Could not retrieve table snapshot: %s", 
snapshotId);
+    }

Review Comment:
   it's okay because snapshot is reset on each process call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to