gemini-code-assist[bot] commented on code in PR #37191: URL: https://github.com/apache/beam/pull/37191#discussion_r2648519633
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.Type.ADDED_ROWS; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ChangelogScanner + extends DoFn< + KV<String, List<SnapshotInfo>>, KV<ChangelogDescriptor, List<SerializableChangelogTask>>> { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogScanner.class); + private static final Counter totalChangelogScanTasks = + Metrics.counter(ChangelogScanner.class, "totalChangelogScanTasks"); + private static final Counter numAddedRowsScanTasks = + Metrics.counter(ChangelogScanner.class, "numAddedRowsScanTasks"); + private static final Counter numDeletedRowsScanTasks = + Metrics.counter(ChangelogScanner.class, "numDeletedRowsScanTasks"); + private static final Counter numDeletedDataFileScanTasks = + Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks"); + public static final TupleTag<KV<ChangelogDescriptor, List<SerializableChangelogTask>>> + UNIFORM_CHANGES = new TupleTag<>(); + public static final TupleTag<KV<ChangelogDescriptor, List<SerializableChangelogTask>>> + MIXED_CHANGES = new TupleTag<>(); + public static final KvCoder<ChangelogDescriptor, List<SerializableChangelogTask>> OUTPUT_CODER = + KvCoder.of(ChangelogDescriptor.coder(), ListCoder.of(SerializableChangelogTask.coder())); + private final IcebergScanConfig scanConfig; + + ChangelogScanner(IcebergScanConfig scanConfig) { + this.scanConfig = scanConfig; + } + + @ProcessElement + public void process(@Element KV<String, List<SnapshotInfo>> element, MultiOutputReceiver out) + throws IOException { + // TODO: use TableCache here + Table table = scanConfig.getTable(); Review Comment:  The `TODO` comment suggests using `TableCache`. This is a good practice to avoid repeatedly loading table metadata, which can be expensive. Please use `TableCache.get(element.getKey())` here. You will also need to add a `@Setup` method to initialize the `TableCache` with `TableCache.setup(scanConfig);`. ```suggestion Table table = TableCache.get(element.getKey()); ``` ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java: ########## @@ -208,4 +243,138 @@ public static CloseableIterable<Record> maybeApplyFilter( } return iterable; } + + public static DeleteFilter<Record> genericDeleteFilter( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List<SerializableDeleteFile> deletes) { + return new BeamDeleteFilter( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static DeleteReader<Record> genericDeleteReader( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List<SerializableDeleteFile> deletes) { + return new BeamDeleteReader( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static class BeamDeleteFilter extends DeleteFilter<Record> { + private final FileIO io; + private final InternalRecordWrapper asStructLike; + + @SuppressWarnings("method.invocation") + public BeamDeleteFilter( + FileIO io, + String dataFilePath, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(dataFilePath, deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteFilter( + FileIO io, + SerializableChangelogTask scanTask, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteFilter(FileIO io, ContentScanTask<?> scanTask, List<DeleteFile> deleteFiles) { + super( + scanTask.file().location(), + deleteFiles, + scanTask.spec().schema(), + scanTask.spec().schema()); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + @Override + protected StructLike asStructLike(Record record) { + return asStructLike.wrap(record); + } + + @Override + protected InputFile getInputFile(String location) { + return io.newInputFile(location); + } + } + + public static class BeamDeleteReader extends DeleteReader<Record> { + private final FileIO io; + private final InternalRecordWrapper asStructLike; + + @SuppressWarnings("method.invocation") + public BeamDeleteReader( + FileIO io, + String dataFilePath, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(dataFilePath, deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteReader( + FileIO io, + SerializableChangelogTask scanTask, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteReader(FileIO io, ContentScanTask<?> scanTask, List<DeleteFile> deleteFiles) { + super( + scanTask.file().location(), + deleteFiles, + scanTask.spec().schema(), + scanTask.spec().schema()); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } Review Comment:  These constructors are marked as unused with a `TODO` to remove them. To improve code clarity and maintainability, they should be removed. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReconcileChanges.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import java.util.Iterator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +public class ReconcileChanges extends DoFn<KV<Row, CoGbkResult>, Row> { + public static final TupleTag<TimestampedValue<Row>> DELETES = new TupleTag<>() {}; + public static final TupleTag<TimestampedValue<Row>> INSERTS = new TupleTag<>() {}; + + @DoFn.ProcessElement + public void processElement( + @Element KV<Row, CoGbkResult> element, + @Timestamp Instant timestamp, + OutputReceiver<Row> out) { + CoGbkResult result = element.getValue(); + + // iterables are lazy-loaded from the shuffle service + Iterable<TimestampedValue<Row>> deletes = result.getAll(DELETES); + Iterable<TimestampedValue<Row>> inserts = result.getAll(INSERTS); + + boolean hasDeletes = deletes.iterator().hasNext(); + boolean hasInserts = inserts.iterator().hasNext(); + + if (hasInserts && hasDeletes) { + // UPDATE: row ID exists in both streams + // - emit all deletes as 'UPDATE_BEFORE', and all inserts as 'UPDATE_AFTER' + // - emit extra inserts as 'UPDATE_AFTER' + // - ignore extra deletes (TODO: double check if this is a good decision) + Iterator<TimestampedValue<Row>> deletesIterator = deletes.iterator(); + Iterator<TimestampedValue<Row>> insertsIterator = inserts.iterator(); + while (deletesIterator.hasNext() && insertsIterator.hasNext()) { + // TODO: output as UPDATE_BEFORE kind + TimestampedValue<Row> updateBefore = deletesIterator.next(); + out.outputWithTimestamp(updateBefore.getValue(), updateBefore.getTimestamp()); + System.out.printf("[MIXED] -- UpdateBefore\n%s\n", updateBefore); + + // TODO: output as UPDATE_AFTER kind + TimestampedValue<Row> updateAfter = insertsIterator.next(); + out.outputWithTimestamp(updateAfter.getValue(), updateAfter.getTimestamp()); + System.out.printf("[MIXED] -- UpdateAfter\n%s\n", updateAfter); + } + while (insertsIterator.hasNext()) { + // TODO: output as UPDATE_AFTER kind + TimestampedValue<Row> insert = insertsIterator.next(); + out.outputWithTimestamp(insert.getValue(), insert.getTimestamp()); + System.out.printf("[MIXED] -- Added(extra)\n%s\n", insert); + } + } else if (hasInserts) { + // INSERT only + for (TimestampedValue<Row> rec : inserts) { + System.out.printf("[MIXED] -- Added\n%s\n", rec); + out.outputWithTimestamp(rec.getValue(), rec.getTimestamp()); + } + } else if (hasDeletes) { + // DELETE only + for (TimestampedValue<Row> rec : deletes) { + // TODO: output as DELETE kind + System.out.printf("[MIXED] -- Deleted\n%s\n", rec); + out.outputWithTimestamp(rec.getValue(), rec.getTimestamp()); + } + } Review Comment:  This method contains several `System.out.printf` statements that appear to be for debugging. They should be removed before merging. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.iceberg.ReadUtils; +import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.joda.time.Instant; + [email protected] +public class ReadFromChangelogs<OutT> + extends DoFn<KV<ChangelogDescriptor, List<SerializableChangelogTask>>, OutT> { + public static final TupleTag<Row> UNIFORM_ROWS = new TupleTag<>(); + public static final TupleTag<KV<Row, Row>> KEYED_INSERTS = new TupleTag<>(); + public static final TupleTag<KV<Row, Row>> KEYED_DELETES = new TupleTag<>(); + + private final Counter numAddedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numAddedRowsScanTasksCompleted"); + private final Counter numDeletedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedRowsScanTasksCompleted"); + private final Counter numDeletedDataFileScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedDataFileScanTasksCompleted"); + + private final IcebergScanConfig scanConfig; + private final boolean keyedOutput; + private transient StructProjection recordIdProjection; + private transient org.apache.iceberg.Schema recordIdSchema; + private final Schema beamRowSchema; + private final Schema rowIdWithOrdinalBeamSchema; + private static final String ORDINAL_FIELD = "__beam__changelog__ordinal__"; + + private ReadFromChangelogs(IcebergScanConfig scanConfig, boolean keyedOutput) { + this.scanConfig = scanConfig; + this.keyedOutput = keyedOutput; + + this.beamRowSchema = icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()); + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames()); + this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema); + + Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema); + List<Schema.Field> fields = + ImmutableList.<Schema.Field>builder() + .add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32)) + .addAll(rowIdBeamSchema.getFields()) + .build(); + this.rowIdWithOrdinalBeamSchema = new Schema(fields); + } + + static ReadFromChangelogs<Row> of(IcebergScanConfig scanConfig) { + return new ReadFromChangelogs<>(scanConfig, false); + } + + static ReadFromChangelogs<KV<Row, Row>> withKeyedOutput(IcebergScanConfig scanConfig) { + return new ReadFromChangelogs<>(scanConfig, true); + } + + /** + * Determines the keyed output coder, which depends on the requested projected schema and the + * schema's identifier fields. + */ + static KvCoder<Row, Row> keyedOutputCoder(IcebergScanConfig scanConfig) { + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + org.apache.iceberg.Schema recordIdSchema = + recordSchema.select(recordSchema.identifierFieldNames()); + Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema); + List<Schema.Field> fields = + ImmutableList.<Schema.Field>builder() + .add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32)) + .addAll(rowIdBeamSchema.getFields()) + .build(); + Schema rowIdWithOrdinalBeamSchema = new Schema(fields); + return KvCoder.of( + SchemaCoder.of(rowIdWithOrdinalBeamSchema), + SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema))); + } + + @Setup + public void setup() { + // StructProjection is not serializable, so we need to recompute it when the DoFn gets + // deserialized + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames()); + this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema); + } + + @ProcessElement + public void process( + @Element KV<ChangelogDescriptor, List<SerializableChangelogTask>> element, + RestrictionTracker<OffsetRange, Long> tracker, + MultiOutputReceiver out) + throws IOException { + // TODO: use TableCache + Table table = scanConfig.getTable(); Review Comment:  The `TODO` comment suggests using `TableCache`. This is a good practice to avoid repeatedly loading table metadata. Please use `TableCache.get(element.getKey().getTableIdentifierString())` here. You will also need to add `TableCache.setup(scanConfig);` to the `@Setup` method. ```suggestion Table table = TableCache.get(element.getKey().getTableIdentifierString()); ``` ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java: ########## @@ -208,4 +243,138 @@ public static CloseableIterable<Record> maybeApplyFilter( } return iterable; } + + public static DeleteFilter<Record> genericDeleteFilter( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List<SerializableDeleteFile> deletes) { + return new BeamDeleteFilter( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static DeleteReader<Record> genericDeleteReader( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List<SerializableDeleteFile> deletes) { + return new BeamDeleteReader( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static class BeamDeleteFilter extends DeleteFilter<Record> { + private final FileIO io; + private final InternalRecordWrapper asStructLike; + + @SuppressWarnings("method.invocation") + public BeamDeleteFilter( + FileIO io, + String dataFilePath, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(dataFilePath, deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteFilter( + FileIO io, + SerializableChangelogTask scanTask, + Schema tableSchema, + Schema projectedSchema, + List<DeleteFile> deleteFiles) { + super(scanTask.getDataFile().getPath(), deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + // TODO: remove this (unused) + @SuppressWarnings("method.invocation") + public BeamDeleteFilter(FileIO io, ContentScanTask<?> scanTask, List<DeleteFile> deleteFiles) { + super( + scanTask.file().location(), + deleteFiles, + scanTask.spec().schema(), + scanTask.spec().schema()); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } Review Comment:  These constructors are marked as unused with a `TODO` to remove them. To improve code clarity and maintainability, they should be removed. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.iceberg.ReadUtils; +import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.joda.time.Instant; + [email protected] +public class ReadFromChangelogs<OutT> + extends DoFn<KV<ChangelogDescriptor, List<SerializableChangelogTask>>, OutT> { + public static final TupleTag<Row> UNIFORM_ROWS = new TupleTag<>(); + public static final TupleTag<KV<Row, Row>> KEYED_INSERTS = new TupleTag<>(); + public static final TupleTag<KV<Row, Row>> KEYED_DELETES = new TupleTag<>(); + + private final Counter numAddedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numAddedRowsScanTasksCompleted"); + private final Counter numDeletedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedRowsScanTasksCompleted"); + private final Counter numDeletedDataFileScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedDataFileScanTasksCompleted"); + + private final IcebergScanConfig scanConfig; + private final boolean keyedOutput; + private transient StructProjection recordIdProjection; + private transient org.apache.iceberg.Schema recordIdSchema; + private final Schema beamRowSchema; + private final Schema rowIdWithOrdinalBeamSchema; + private static final String ORDINAL_FIELD = "__beam__changelog__ordinal__"; + + private ReadFromChangelogs(IcebergScanConfig scanConfig, boolean keyedOutput) { + this.scanConfig = scanConfig; + this.keyedOutput = keyedOutput; + + this.beamRowSchema = icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()); + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames()); + this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema); + + Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema); + List<Schema.Field> fields = + ImmutableList.<Schema.Field>builder() + .add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32)) + .addAll(rowIdBeamSchema.getFields()) + .build(); + this.rowIdWithOrdinalBeamSchema = new Schema(fields); + } + + static ReadFromChangelogs<Row> of(IcebergScanConfig scanConfig) { + return new ReadFromChangelogs<>(scanConfig, false); + } + + static ReadFromChangelogs<KV<Row, Row>> withKeyedOutput(IcebergScanConfig scanConfig) { + return new ReadFromChangelogs<>(scanConfig, true); + } + + /** + * Determines the keyed output coder, which depends on the requested projected schema and the + * schema's identifier fields. + */ + static KvCoder<Row, Row> keyedOutputCoder(IcebergScanConfig scanConfig) { + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + org.apache.iceberg.Schema recordIdSchema = + recordSchema.select(recordSchema.identifierFieldNames()); + Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema); + List<Schema.Field> fields = + ImmutableList.<Schema.Field>builder() + .add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32)) + .addAll(rowIdBeamSchema.getFields()) + .build(); + Schema rowIdWithOrdinalBeamSchema = new Schema(fields); + return KvCoder.of( + SchemaCoder.of(rowIdWithOrdinalBeamSchema), + SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema))); + } + + @Setup + public void setup() { + // StructProjection is not serializable, so we need to recompute it when the DoFn gets + // deserialized + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames()); + this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema); + } + + @ProcessElement + public void process( + @Element KV<ChangelogDescriptor, List<SerializableChangelogTask>> element, + RestrictionTracker<OffsetRange, Long> tracker, + MultiOutputReceiver out) + throws IOException { + // TODO: use TableCache + Table table = scanConfig.getTable(); + table.refresh(); + + List<SerializableChangelogTask> tasks = element.getValue(); + + for (long l = tracker.currentRestriction().getFrom(); + l < tracker.currentRestriction().getTo(); + l++) { + if (!tracker.tryClaim(l)) { + return; + } + + SerializableChangelogTask task = tasks.get((int) l); + switch (task.getType()) { + case ADDED_ROWS: + processAddedRowsTask(task, table, out); + break; + case DELETED_ROWS: + processDeletedRowsTask(task, table, out); + break; + case DELETED_FILE: + processDeletedFileTask(task, table, out); + break; + } + } + } + + private void processAddedRowsTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + try (CloseableIterable<Record> fullIterable = ReadUtils.createReader(task, table, scanConfig)) { + DeleteFilter<Record> deleteFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes()); + CloseableIterable<Record> filtered = deleteFilter.filter(fullIterable); + + for (Record rec : filtered) { + outputRecord( + rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_INSERTS); + } + } + numAddedRowsScanTasksCompleted.inc(); + } + + private void processDeletedRowsTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + DeleteFilter<Record> existingDeletesFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes()); + DeleteReader<Record> newDeletesReader = + ReadUtils.genericDeleteReader( + table, scanConfig, task.getDataFile().getPath(), task.getAddedDeletes()); + + try (CloseableIterable<Record> allRecords = ReadUtils.createReader(task, table, scanConfig)) { + CloseableIterable<Record> liveRecords = existingDeletesFilter.filter(allRecords); + CloseableIterable<Record> newlyDeletedRecords = newDeletesReader.read(liveRecords); + + for (Record rec : newlyDeletedRecords) { + // TODO: output with DELETE kind + outputRecord( + rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_DELETES); + } + } + numDeletedRowsScanTasksCompleted.inc(); + } + + private void processDeletedFileTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + try (CloseableIterable<Record> fullIterable = ReadUtils.createReader(task, table, scanConfig)) { + DeleteFilter<Record> deleteFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes()); + CloseableIterable<Record> filtered = deleteFilter.filter(fullIterable); + for (Record rec : filtered) { + // TODO: output with DELETE kind + outputRecord( + rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_DELETES); + } + } + numDeletedDataFileScanTasksCompleted.inc(); + } + + private void outputRecord( + Record rec, + MultiOutputReceiver outputReceiver, + int ordinal, + long timestampMillis, + TupleTag<KV<Row, Row>> keyedTag) { + Row row = IcebergUtils.icebergRecordToBeamRow(beamRowSchema, rec); + Instant timestamp = Instant.ofEpochMilli(timestampMillis); + if (keyedOutput) { // slow path + StructProjection recId = recordIdProjection.wrap(rec); + // Create a Row ID consisting of record ID columns and the changelog task's ordinal # + Row id = structToBeamRow(ordinal, recId, recordIdSchema, rowIdWithOrdinalBeamSchema); + outputReceiver.get(keyedTag).outputWithTimestamp(KV.of(id, row), timestamp); + } else { // fast path + System.out.printf("[UNIFORM] -- Output(%s, %s)\n%s%n", ordinal, timestamp, row); Review Comment:  This `System.out.printf` seems to be for debugging and should be removed before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
