gemini-code-assist[bot] commented on code in PR #37191:
URL: https://github.com/apache/beam/pull/37191#discussion_r2649575108


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.ReadUtils;
+import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
+import org.joda.time.Instant;
+
[email protected]
+public class ReadFromChangelogs<OutT>
+    extends DoFn<KV<ChangelogDescriptor, List<SerializableChangelogTask>>, 
OutT> {
+  public static final TupleTag<Row> UNIDIRECTIONAL_ROWS = new TupleTag<>();
+  public static final TupleTag<KV<Row, Row>> KEYED_INSERTS = new TupleTag<>();
+  public static final TupleTag<KV<Row, Row>> KEYED_DELETES = new TupleTag<>();
+
+  private final Counter numAddedRowsScanTasksCompleted =
+      Metrics.counter(ReadFromChangelogs.class, 
"numAddedRowsScanTasksCompleted");
+  private final Counter numDeletedRowsScanTasksCompleted =
+      Metrics.counter(ReadFromChangelogs.class, 
"numDeletedRowsScanTasksCompleted");
+  private final Counter numDeletedDataFileScanTasksCompleted =
+      Metrics.counter(ReadFromChangelogs.class, 
"numDeletedDataFileScanTasksCompleted");
+
+  private final IcebergScanConfig scanConfig;
+  private final boolean keyedOutput;
+  private transient StructProjection recordIdProjection;
+  private transient org.apache.iceberg.Schema recordIdSchema;
+  private final Schema beamRowSchema;
+  private final Schema rowAndSnapshotIDBeamSchema;
+  private static final String SNAPSHOT_FIELD = 
"__beam__changelog__snapshot__id__";
+
+  private ReadFromChangelogs(IcebergScanConfig scanConfig, boolean 
keyedOutput) {
+    this.scanConfig = scanConfig;
+    this.keyedOutput = keyedOutput;
+
+    this.beamRowSchema = 
icebergSchemaToBeamSchema(scanConfig.getProjectedSchema());
+    org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
+    this.recordIdSchema = 
recordSchema.select(recordSchema.identifierFieldNames());
+    this.recordIdProjection = StructProjection.create(recordSchema, 
recordIdSchema);
+
+    this.rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig);
+  }
+
+  static ReadFromChangelogs<Row> of(IcebergScanConfig scanConfig) {
+    return new ReadFromChangelogs<>(scanConfig, false);
+  }
+
+  static ReadFromChangelogs<KV<Row, Row>> withKeyedOutput(IcebergScanConfig 
scanConfig) {
+    return new ReadFromChangelogs<>(scanConfig, true);
+  }
+
+  /**
+   * Determines the keyed output coder, which depends on the requested 
projected schema and the
+   * schema's identifier fields.
+   */
+  static KvCoder<Row, Row> keyedOutputCoder(IcebergScanConfig scanConfig) {
+    org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
+    Schema rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig);
+    return KvCoder.of(
+        SchemaCoder.of(rowAndSnapshotIDBeamSchema),
+        SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema)));
+  }
+
+  private static Schema rowAndSnapshotIDBeamSchema(IcebergScanConfig 
scanConfig) {
+    org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
+    org.apache.iceberg.Schema recordIdSchema =
+        recordSchema.select(recordSchema.identifierFieldNames());
+    Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema);
+    List<Schema.Field> fields =
+        ImmutableList.<Schema.Field>builder()
+            .add(Schema.Field.of(SNAPSHOT_FIELD, Schema.FieldType.INT64))
+            .addAll(rowIdBeamSchema.getFields())
+            .build();
+    return new Schema(fields);
+  }
+
+  @Setup
+  public void setup() {
+    // StructProjection is not serializable, so we need to recompute it when 
the DoFn gets
+    // deserialized
+    org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
+    this.recordIdSchema = 
recordSchema.select(recordSchema.identifierFieldNames());
+    this.recordIdProjection = StructProjection.create(recordSchema, 
recordIdSchema);
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<ChangelogDescriptor, List<SerializableChangelogTask>> 
element,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      MultiOutputReceiver out)
+      throws IOException {
+    // TODO: use TableCache
+    Table table = scanConfig.getTable();
+    table.refresh();
+
+    List<SerializableChangelogTask> tasks = element.getValue();
+
+    for (long l = tracker.currentRestriction().getFrom();
+        l < tracker.currentRestriction().getTo();
+        l++) {
+      if (!tracker.tryClaim(l)) {
+        return;
+      }
+
+      SerializableChangelogTask task = tasks.get((int) l);
+      switch (task.getType()) {
+        case ADDED_ROWS:
+          processAddedRowsTask(task, table, out);
+          break;
+        case DELETED_ROWS:
+          processDeletedRowsTask(task, table, out);
+          break;
+        case DELETED_FILE:
+          processDeletedFileTask(task, table, out);
+          break;
+      }
+    }
+  }
+
+  private void processAddedRowsTask(
+      SerializableChangelogTask task, Table table, MultiOutputReceiver 
outputReceiver)
+      throws IOException {
+    try (CloseableIterable<Record> fullIterable = ReadUtils.createReader(task, 
table, scanConfig)) {
+      DeleteFilter<Record> deleteFilter =
+          ReadUtils.genericDeleteFilter(
+              table, scanConfig, task.getDataFile().getPath(), 
task.getAddedDeletes());
+      CloseableIterable<Record> filtered = deleteFilter.filter(fullIterable);
+
+      for (Record rec : filtered) {
+        outputRecord(
+            rec,
+            outputReceiver,
+            task.getCommitSnapshotId(),
+            task.getTimestampMillis(),
+            KEYED_INSERTS);
+      }
+    }
+    numAddedRowsScanTasksCompleted.inc();
+  }
+
+  private void processDeletedRowsTask(
+      SerializableChangelogTask task, Table table, MultiOutputReceiver 
outputReceiver)
+      throws IOException {
+    DeleteFilter<Record> existingDeletesFilter =
+        ReadUtils.genericDeleteFilter(
+            table, scanConfig, task.getDataFile().getPath(), 
task.getExistingDeletes());
+    DeleteReader<Record> newDeletesReader =
+        ReadUtils.genericDeleteReader(
+            table, scanConfig, task.getDataFile().getPath(), 
task.getAddedDeletes());
+
+    try (CloseableIterable<Record> allRecords = ReadUtils.createReader(task, 
table, scanConfig)) {
+      CloseableIterable<Record> liveRecords = 
existingDeletesFilter.filter(allRecords);
+      CloseableIterable<Record> newlyDeletedRecords = 
newDeletesReader.read(liveRecords);
+
+      for (Record rec : newlyDeletedRecords) {
+        // TODO: output with DELETE kind
+        outputRecord(
+            rec,
+            outputReceiver,
+            task.getCommitSnapshotId(),
+            task.getTimestampMillis(),
+            KEYED_DELETES);
+      }
+    }
+    numDeletedRowsScanTasksCompleted.inc();
+  }
+
+  private void processDeletedFileTask(
+      SerializableChangelogTask task, Table table, MultiOutputReceiver 
outputReceiver)
+      throws IOException {
+    try (CloseableIterable<Record> fullIterable = ReadUtils.createReader(task, 
table, scanConfig)) {
+      DeleteFilter<Record> deleteFilter =
+          ReadUtils.genericDeleteFilter(
+              table, scanConfig, task.getDataFile().getPath(), 
task.getExistingDeletes());
+      CloseableIterable<Record> filtered = deleteFilter.filter(fullIterable);
+      for (Record rec : filtered) {
+        // TODO: output with DELETE kind
+        outputRecord(
+            rec,
+            outputReceiver,
+            task.getCommitSnapshotId(),
+            task.getTimestampMillis(),
+            KEYED_DELETES);
+      }
+    }
+    numDeletedDataFileScanTasksCompleted.inc();
+  }
+
+  private void outputRecord(
+      Record rec,
+      MultiOutputReceiver outputReceiver,
+      long snapshotId,
+      long timestampMillis,
+      TupleTag<KV<Row, Row>> keyedTag) {
+    Row row = IcebergUtils.icebergRecordToBeamRow(beamRowSchema, rec);
+    Instant timestamp = Instant.ofEpochMilli(timestampMillis);
+    if (keyedOutput) { // slow path
+      StructProjection recId = recordIdProjection.wrap(rec);
+      // Create a Row ID consisting of:
+      // 1. the task's commit snapshot ID
+      // 2. the record ID column values
+      // This is needed to sufficiently distinguish a record change
+      Row id = structToBeamRow(snapshotId, recId, recordIdSchema, 
rowAndSnapshotIDBeamSchema);
+      outputReceiver.get(keyedTag).outputWithTimestamp(KV.of(id, row), 
timestamp);
+    } else { // fast path
+      System.out.printf("[UNIDIRECTIONAL] -- Output(%s, %s)\n%s%n", 
snapshotId, timestamp, row);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This `System.out.printf` statement appears to be for debugging. Please 
remove it from the production code.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static 
org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.Type.ADDED_ROWS;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.SnapshotInfo;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChangelogScanner
+    extends DoFn<
+        KV<String, List<SnapshotInfo>>, KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogScanner.class);
+  private static final Counter totalChangelogScanTasks =
+      Metrics.counter(ChangelogScanner.class, "totalChangelogScanTasks");
+  private static final Counter numAddedRowsScanTasks =
+      Metrics.counter(ChangelogScanner.class, "numAddedRowsScanTasks");
+  private static final Counter numDeletedRowsScanTasks =
+      Metrics.counter(ChangelogScanner.class, "numDeletedRowsScanTasks");
+  private static final Counter numDeletedDataFileScanTasks =
+      Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks");
+  public static final TupleTag<KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>>
+      UNIDIRECTIONAL_CHANGES = new TupleTag<>();
+  public static final TupleTag<KV<ChangelogDescriptor, 
List<SerializableChangelogTask>>>
+      BIDIRECTIONAL_CHANGES = new TupleTag<>();
+  public static final KvCoder<ChangelogDescriptor, 
List<SerializableChangelogTask>> OUTPUT_CODER =
+      KvCoder.of(ChangelogDescriptor.coder(), 
ListCoder.of(SerializableChangelogTask.coder()));
+  private final IcebergScanConfig scanConfig;
+
+  ChangelogScanner(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(@Element KV<String, List<SnapshotInfo>> element, 
MultiOutputReceiver out)
+      throws IOException {
+    // TODO: use TableCache here
+    Table table = scanConfig.getTable();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The TODO comment here suggests using `TableCache`. Loading a table can be an 
expensive operation, especially in a streaming context. Using a cache would 
improve performance. Please address this TODO by using the `TableCache`.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDeleteFile.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static 
org.apache.beam.sdk.io.iceberg.SerializableDataFile.computeMapByteHashCode;
+import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.mapEquals;
+import static 
org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteArrayMap;
+import static 
org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteBufferMap;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Equivalence;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There are several unused imports in this file (`java.util.Arrays`, 
`java.util.HashMap`, `com.google.common.base.Equivalence`, 
`com.google.common.collect.Maps`). Please remove them to keep the code clean.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/IncrementalChangelogSource.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static 
org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.BIDIRECTIONAL_CHANGES;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.UNIDIRECTIONAL_CHANGES;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.KEYED_DELETES;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.KEYED_INSERTS;
+import static 
org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.UNIDIRECTIONAL_ROWS;
+import static org.apache.beam.sdk.io.iceberg.cdc.ReconcileChanges.DELETES;
+import static org.apache.beam.sdk.io.iceberg.cdc.ReconcileChanges.INSERTS;
+
+import java.util.List;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.IncrementalScanSource;
+import org.apache.beam.sdk.io.iceberg.SnapshotInfo;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.transforms.Reify;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+public class IncrementalChangelogSource extends IncrementalScanSource {
+  public IncrementalChangelogSource(IcebergScanConfig scanConfig) {
+    super(scanConfig);
+  }
+
+  @Override
+  public PCollection<Row> expand(PBegin input) {
+    Table table =
+        scanConfig
+            .getCatalogConfig()
+            .catalog()
+            .loadTable(TableIdentifier.parse(scanConfig.getTableIdentifier()));
+
+    PCollection<KV<String, List<SnapshotInfo>>> snapshots =
+        MoreObjects.firstNonNull(scanConfig.getStreaming(), false)
+            ? unboundedSnapshots(input)
+            : boundedSnapshots(input, table);
+
+    // scan each interval of snapshots and create groups of changelog tasks
+    PCollectionTuple changelogTasks =
+        snapshots
+            .apply(Redistribute.byKey())
+            .apply(
+                "Create Changelog Tasks",
+                ParDo.of(new ChangelogScanner(scanConfig))
+                    .withOutputTags(
+                        UNIDIRECTIONAL_CHANGES, 
TupleTagList.of(BIDIRECTIONAL_CHANGES)));
+
+    // for changelog ordinal groups that have UNIDIRECTIONAL changes (i.e. all 
deletes, or all
+    // inserts),
+    // take the fast approach of just reading and emitting CDC records.
+    PCollection<Row> uniDirectionalCdcRows =
+        processUniDirectionalChanges(
+            
changelogTasks.get(UNIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
+
+    // changelog ordinal groups that have BIDIRECTIONAL changes (i.e. both 
deletes and inserts)
+    // will need extra processing (including a shuffle) to identify any updates
+    PCollection<Row> biDirectionalCdcRows =
+        processBiDirectionalChanges(
+            
changelogTasks.get(BIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
+
+    // Merge UNIDIRECTIONAL and BIDIRECTIONAL outputs
+    return PCollectionList.of(uniDirectionalCdcRows)
+        .and(biDirectionalCdcRows)
+        .apply(Flatten.pCollections());
+  }

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   This is a great start on a CDC source for Iceberg. However, a critical piece 
of functionality is missing: the output `PCollection<Row>` does not contain any 
information about the type of change (e.g., INSERT, DELETE, UPDATE). A CDC 
stream is not very useful without this metadata.
   
   Several `TODO` comments in the code confirm this is not yet implemented:
   - `ReadFromChangelogs.java`: `// TODO: output with DELETE kind`
   - `ReconcileChanges.java`: `// TODO: output as UPDATE_BEFORE kind`, `// 
TODO: output as UPDATE_AFTER kind`, `// TODO: output as DELETE kind`
   
   To address this, the output `Row` schema should be augmented to include a 
field for the change type. A common practice is to add a `change_type` string 
field. Alternatively, you could align with Beam's `ChangeStream` format. 
Without this, consumers of this source cannot distinguish between different 
types of changes.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -208,4 +243,138 @@ public static CloseableIterable<Record> maybeApplyFilter(
     }
     return iterable;
   }
+
+  public static DeleteFilter<Record> genericDeleteFilter(
+      Table table,
+      IcebergScanConfig scanConfig,
+      String dataFilePath,
+      List<SerializableDeleteFile> deletes) {
+    return new BeamDeleteFilter(
+        table.io(),
+        dataFilePath,
+        scanConfig.getRequiredSchema(),
+        scanConfig.getProjectedSchema(),
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()));
+  }
+
+  public static DeleteReader<Record> genericDeleteReader(
+      Table table,
+      IcebergScanConfig scanConfig,
+      String dataFilePath,
+      List<SerializableDeleteFile> deletes) {
+    return new BeamDeleteReader(
+        table.io(),
+        dataFilePath,
+        scanConfig.getRequiredSchema(),
+        scanConfig.getProjectedSchema(),
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()));
+  }
+
+  public static class BeamDeleteFilter extends DeleteFilter<Record> {
+    private final FileIO io;
+    private final InternalRecordWrapper asStructLike;
+
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(
+        FileIO io,
+        String dataFilePath,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(dataFilePath, deleteFiles, tableSchema, projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(
+        FileIO io,
+        SerializableChangelogTask scanTask,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, 
projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(FileIO io, ContentScanTask<?> scanTask, 
List<DeleteFile> deleteFiles) {
+      super(
+          scanTask.file().location(),
+          deleteFiles,
+          scanTask.spec().schema(),
+          scanTask.spec().schema());
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `BeamDeleteFilter` class has two constructors (starting on lines 295 and 
308) that are marked as unused with a `TODO` to remove them. Please remove this 
dead code to improve maintainability.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReconcileChanges.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import java.util.Iterator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+public class ReconcileChanges extends DoFn<KV<Row, CoGbkResult>, Row> {
+  public static final TupleTag<TimestampedValue<Row>> DELETES = new 
TupleTag<>() {};
+  public static final TupleTag<TimestampedValue<Row>> INSERTS = new 
TupleTag<>() {};
+
+  @DoFn.ProcessElement
+  public void processElement(
+      @Element KV<Row, CoGbkResult> element,
+      @Timestamp Instant timestamp,
+      OutputReceiver<Row> out) {
+    CoGbkResult result = element.getValue();
+    System.out.println("xxx [MIXED] Process timestamp: " + timestamp);
+
+    // iterables are lazy-loaded from the shuffle service
+    Iterable<TimestampedValue<Row>> deletes = result.getAll(DELETES);
+    Iterable<TimestampedValue<Row>> inserts = result.getAll(INSERTS);
+
+    boolean hasDeletes = deletes.iterator().hasNext();
+    boolean hasInserts = inserts.iterator().hasNext();
+
+    if (hasInserts && hasDeletes) {
+      // UPDATE: row ID exists in both streams
+      // - emit all deletes as 'UPDATE_BEFORE', and all inserts as 
'UPDATE_AFTER'
+      // - emit extra inserts as 'UPDATE_AFTER'
+      // - ignore extra deletes (TODO: double check if this is a good decision)
+      Iterator<TimestampedValue<Row>> deletesIterator = deletes.iterator();
+      Iterator<TimestampedValue<Row>> insertsIterator = inserts.iterator();
+      while (deletesIterator.hasNext() && insertsIterator.hasNext()) {
+        // TODO: output as UPDATE_BEFORE kind
+        TimestampedValue<Row> updateBefore = deletesIterator.next();
+        out.outputWithTimestamp(updateBefore.getValue(), 
updateBefore.getTimestamp());
+        System.out.printf("[MIXED] -- UpdateBefore\n%s\n", updateBefore);
+
+        // TODO: output as UPDATE_AFTER kind
+        TimestampedValue<Row> updateAfter = insertsIterator.next();
+        out.outputWithTimestamp(updateAfter.getValue(), 
updateAfter.getTimestamp());
+        System.out.printf("[MIXED] -- UpdateAfter\n%s\n", updateAfter);
+      }
+      while (insertsIterator.hasNext()) {
+        // TODO: output as UPDATE_AFTER kind
+        TimestampedValue<Row> insert = insertsIterator.next();
+        out.outputWithTimestamp(insert.getValue(), insert.getTimestamp());
+        System.out.printf("[MIXED] -- Added(extra)\n%s\n", insert);
+      }
+    } else if (hasInserts) {
+      // INSERT only
+      for (TimestampedValue<Row> rec : inserts) {
+        System.out.printf("[MIXED] -- Added\n%s\n", rec);
+        out.outputWithTimestamp(rec.getValue(), rec.getTimestamp());
+      }
+    } else if (hasDeletes) {
+      // DELETE only
+      for (TimestampedValue<Row> rec : deletes) {
+        // TODO: output as DELETE kind
+        System.out.printf("[MIXED] -- Deleted\n%s\n", rec);
+        out.outputWithTimestamp(rec.getValue(), rec.getTimestamp());
+      }
+    }
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This method contains several `System.out` print statements (e.g., on lines 
39, 59, 64) which appear to be for debugging. These should be removed from 
production code. If logging is needed, please use a standard logging framework 
like SLF4J.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -208,4 +243,138 @@ public static CloseableIterable<Record> maybeApplyFilter(
     }
     return iterable;
   }
+
+  public static DeleteFilter<Record> genericDeleteFilter(
+      Table table,
+      IcebergScanConfig scanConfig,
+      String dataFilePath,
+      List<SerializableDeleteFile> deletes) {
+    return new BeamDeleteFilter(
+        table.io(),
+        dataFilePath,
+        scanConfig.getRequiredSchema(),
+        scanConfig.getProjectedSchema(),
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()));
+  }
+
+  public static DeleteReader<Record> genericDeleteReader(
+      Table table,
+      IcebergScanConfig scanConfig,
+      String dataFilePath,
+      List<SerializableDeleteFile> deletes) {
+    return new BeamDeleteReader(
+        table.io(),
+        dataFilePath,
+        scanConfig.getRequiredSchema(),
+        scanConfig.getProjectedSchema(),
+        deletes.stream()
+            .map(sdf -> sdf.createDeleteFile(table.specs(), 
table.sortOrders()))
+            .collect(Collectors.toList()));
+  }
+
+  public static class BeamDeleteFilter extends DeleteFilter<Record> {
+    private final FileIO io;
+    private final InternalRecordWrapper asStructLike;
+
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(
+        FileIO io,
+        String dataFilePath,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(dataFilePath, deleteFiles, tableSchema, projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(
+        FileIO io,
+        SerializableChangelogTask scanTask,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, 
projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteFilter(FileIO io, ContentScanTask<?> scanTask, 
List<DeleteFile> deleteFiles) {
+      super(
+          scanTask.file().location(),
+          deleteFiles,
+          scanTask.spec().schema(),
+          scanTask.spec().schema());
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    @Override
+    protected StructLike asStructLike(Record record) {
+      return asStructLike.wrap(record);
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return io.newInputFile(location);
+    }
+  }
+
+  public static class BeamDeleteReader extends DeleteReader<Record> {
+    private final FileIO io;
+    private final InternalRecordWrapper asStructLike;
+
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteReader(
+        FileIO io,
+        String dataFilePath,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(dataFilePath, deleteFiles, tableSchema, projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteReader(
+        FileIO io,
+        SerializableChangelogTask scanTask,
+        Schema tableSchema,
+        Schema projectedSchema,
+        List<DeleteFile> deleteFiles) {
+      super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, 
projectedSchema);
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }
+
+    // TODO: remove this (unused)
+    @SuppressWarnings("method.invocation")
+    public BeamDeleteReader(FileIO io, ContentScanTask<?> scanTask, 
List<DeleteFile> deleteFiles) {
+      super(
+          scanTask.file().location(),
+          deleteFiles,
+          scanTask.spec().schema(),
+          scanTask.spec().schema());
+      this.io = io;
+      this.asStructLike = new 
InternalRecordWrapper(requiredSchema().asStruct());
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `BeamDeleteReader` class has two constructors (starting on lines 347 and 
360) that are marked as unused with a `TODO` to remove them. Please remove this 
dead code to improve maintainability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to