gemini-code-assist[bot] commented on code in PR #38821:
URL: https://github.com/apache/beam/pull/38821#discussion_r3359260891
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -73,13 +66,37 @@ public class ReadUtils {
"parquet.read.support.class",
"parquet.crypto.factory.class");
- static ParquetReader<Record> createReader(FileScanTask task, Table table,
Schema schema) {
- String filePath = task.file().path().toString();
- EncryptedInputFile encryptedInput =
- EncryptedFiles.encryptedInput(table.io().newInputFile(filePath),
task.file().keyMetadata());
- InputFile inputFile = table.encryption().decrypt(encryptedInput);
- Map<Integer, ?> idToConstants =
- ReadUtils.constantsMap(task,
IdentityPartitionConverters::convertConstant, table.schema());
+ public static CloseableIterable<Record> createReader(
+ ContentScanTask<?> task, Table table, IcebergScanConfig scanConfig) {
+ return createReader(
+ table,
+ scanConfig,
+ scanConfig.getRequiredSchema(),
+ task.spec(),
+ task.file(),
+ null,
+ task.start(),
+ task.length(),
+ task.residual());
+ }
+
+ public static CloseableIterable<Record> createReader(
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema requiredSchema,
+ PartitionSpec spec,
+ ContentFile<?> file,
+ @Nullable Long fileSequenceNumber,
+ long start,
+ long length,
+ Expression residual) {
+ InputFile inputFile;
+ try (FileIO io = table.io()) {
+ EncryptedInputFile encryptedInput =
+ EncryptedFiles.encryptedInput(io.newInputFile(file.location()),
file.keyMetadata());
+ inputFile = table.encryption().decrypt(encryptedInput);
+ }
Review Comment:

Closing `table.io()` inside a try-with-resources block will close the shared
`FileIO` instance managed by the `Table` object. This will cause subsequent
operations on the table (such as reads in other tasks or threads) to fail with
an `IllegalStateException` or `IOException` because the `FileIO` has been
closed. Instead, you should use `table.io()` directly without closing it.
```java
EncryptedInputFile encryptedInput =
EncryptedFiles.encryptedInput(table.io().newInputFile(file.location()),
file.keyMetadata());
InputFile inputFile = table.encryption().decrypt(encryptedInput);
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/CdcReadUtils.java:
##########
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.ReadUtils;
+import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.schema.MessageType;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read-side helpers specific to the CDC source. Keeps {@link ReadUtils}
focused on the
+ * general-purpose append-only read path; everything that takes a {@link
SerializableChangelogTask},
+ * references {@link DeleteReader}, or implements the delete-pushdown
row-group skipping lives here.
+ *
+ * <p>This class still delegates to {@link ReadUtils} for the low-level
Parquet reader construction
+ * — the goal is decoupling, not duplication.
+ */
+public final class CdcReadUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(CdcReadUtils.class);
+
+ /**
+ * Maximum size of an equality delete set to push down as a Parquet residual
{@code IN}
+ * expression. Matches {@link
ParquetMetricsRowGroupFilter#IN_PREDICATE_LIMIT}.
+ */
+ private static final int IN_PREDICATE_LIMIT = 200;
+
+ public static CloseableIterable<Record> createReader(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema outputSchema) {
+ return createReader(task, table, scanConfig, outputSchema,
Expressions.alwaysTrue());
+ }
+
+ /**
+ * Same as {@link #createReader(SerializableChangelogTask, Table,
IcebergScanConfig, Schema)} but
+ * ANDs {@code extraResidual} into the task's residual expression. The
combined expression is
+ * passed to Iceberg's Parquet reader, which uses it as a row-group-level
filter (skips row groups
+ * whose column statistics cannot match). The caller is still responsible
for applying the
+ * residual at the row level.
+ *
+ * <p>This is used to push extra predicates (e.g. an equality-delete {@code
IN} expression) down
+ * to the reader for cheap row-group skipping.
+ */
+ public static CloseableIterable<Record> createReader(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema outputSchema,
+ Expression extraResidual) {
+ return createReader(
+ task, table, scanConfig, outputSchema, extraResidual, task.getStart(),
task.getLength());
+ }
+
+ /**
+ * Same as {@link #createReader(SerializableChangelogTask, Table,
IcebergScanConfig, Schema,
+ * Expression)} but reads the byte range {@code [start, start + length)} of
the DataFile.
+ * Iceberg's Parquet reader selects the row groups whose starting offset
falls within this range,
+ * allowing us to prune row-groups by byte-range.
+ *
+ * <p>Callers are responsible for ensuring the requested range stays within
the task's assigned
+ * range, to avoid reading a section that is meant for another worker.
+ */
+ public static CloseableIterable<Record> createReader(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema outputSchema,
+ Expression extraResidual,
+ long start,
+ long length) {
+ Expression baseResidual = task.getExpression(table.schema());
+ Expression combined =
+ extraResidual.op() == Expression.Operation.TRUE
+ ? baseResidual
+ : Expressions.and(baseResidual, extraResidual);
+ return ReadUtils.createReader(
+ table,
+ scanConfig,
+ outputSchema,
+ checkStateNotNull(table.specs().get(task.getSpecId())),
+ task.getDataFile().createDataFile(table.specs()),
+ task.getDataFile().getFileSequenceNumber(),
+ start,
+ length,
+ combined);
+ }
+
+ /** Returns a filter that skips records marked for deletion. */
+ public static DeleteFilter<Record> genericDeleteFilter(
+ Table table, Schema outputSchema, String dataFilePath,
List<SerializableDeleteFile> deletes) {
+ return new GenericDeleteFilter(
+ table.io(),
+ dataFilePath,
+ table.schema(),
+ outputSchema,
+ deletes.stream()
+ .map(sdf -> sdf.createDeleteFile(table.specs(),
table.sortOrders()))
+ .collect(Collectors.toList()));
+ }
+
+ /** Returns a delete reader that reuses delete structures already loaded by
CDC planning. */
+ public static DeleteReader<Record> genericDeleteReader(
+ Table table,
+ Schema outputSchema,
+ String dataFilePath,
+ List<SerializableDeleteFile> deletes,
+ DeleteReader.PreloadedDeletes preloadedDeletes) {
+ return new GenericDeleteReader(
+ table.io(),
+ dataFilePath,
+ table.schema(),
+ outputSchema,
+ deletes.stream()
+ .map(sdf -> sdf.createDeleteFile(table.specs(),
table.sortOrders()))
+ .collect(Collectors.toList()),
+ preloadedDeletes);
+ }
+
+ /**
+ * Opens the records that a CDC reader should process for a single {@link
+ * SerializableChangelogTask}, applying the appropriate delete-filter /
delete-reader chain for
+ * the task's type:
+ *
+ * <ul>
+ * <li>{@code ADDED_ROWS}: Collect and return the records that became live
in this commit:
+ * <ul>
+ * <li>1. Iterate over records in the added DataFile
+ * <li>2. Filter out records matched by any added deletes
+ * </ul>
+ * <li>{@code DELETED_ROWS}: Return records in the DataFile that are
marked for deletion by new
+ * DeleteFiles, making sure to first ignore records that have already
been marked by
+ * previous DeleteFiles:
+ * <ul>
+ * <li>1. Iterate over records in the referenced DataFile
+ * <li>2. Filter out records matched from existing deletes.
+ * <li>3. Filter out records NOT matched from added deletes
+ * </ul>
+ * <li>{@code DELETED_FILE} — every record in the DataFile that wasn't
already deleted by {@code
+ * existingDeletes}.
+ * <ul>
+ * <li>1. Iterate over records in the referenced DataFile
+ * <li>2. Filter out records matched from existing deletes.
+ * </ul>
+ * </ul>
+ *
+ * <p>Projection pushdown should not be used when reading bi-directional
tasks because we need to
+ * compare all record columns to accurately identify updates. Otherwise,
user-configured
+ * projection may drop a column that contains real updates. If this happens,
the downstream
+ * resolver will mistakenly determine the (delete, insert) pair to be a
duplicate.
+ *
+ * <p>If CDC metadata columns are requested, this method only adds
row-sourced metadata columns
+ * ({@code _row_id}, {@code _last_updated_sequence_number}) to the Iceberg
read schema. Commit
+ * metadata columns are added later by {@link CdcOutputUtils#outputRow}.
+ */
+ public static CloseableIterable<Record> changelogRecordsForTask(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ boolean useProjectedSchema) {
+ String dataFilePath = task.getDataFile().getPath();
+ Schema outputSchema =
+ CdcOutputUtils.readSchemaWithRowMetadata(
+ scanConfig.getMetadataColumns(),
+ useProjectedSchema ? scanConfig.getRequiredSchema() :
table.schema());
+ switch (task.getType()) {
+ case ADDED_ROWS:
+ DeleteFilter<Record> addedDeletesFilter =
+ genericDeleteFilter(table, outputSchema, dataFilePath,
task.getAddedDeletes());
+ return addedDeletesFilter.filter(
+ createReader(task, table, scanConfig,
addedDeletesFilter.requiredSchema()));
+ case DELETED_FILE:
+ DeleteFilter<Record> existingDeletesFilter =
+ genericDeleteFilter(table, outputSchema, dataFilePath,
task.getExistingDeletes());
+ return existingDeletesFilter.filter(
+ createReader(task, table, scanConfig,
existingDeletesFilter.requiredSchema()));
+ case DELETED_ROWS:
+ return deletedRowsForTask(task, table, scanConfig, outputSchema);
+ default:
+ throw new IllegalStateException("Unknown ChangelogScanTask type: " +
task.getType());
+ }
+ }
+
+ /**
+ * Builds the reader chain for a {@code DELETED_ROWS} task with row-group
pushdown when possible.
+ * This helps the reader skip entire row groups. For unskipped row groups,
the reader should still
+ * apply per-record position + equality checks at the row level.
+ *
+ * <p>We use two pushdown strategies, depending on the type of {@link
DeleteFile} in the task
+ * (Position Delete vs. Equality Delete). The two strategies can be combined
if both {@link
+ * DeleteFile} types are present.
+ *
+ * <ol>
+ * <li><b>Byte-range pushdown for Position Deletes:</b> pre-load the {@link
+ * PositionDeleteIndex}, read the Parquet footer, and compute a single
contiguous byte range
+ * covering the row groups that contain at least one deleted position.
+ * <li><b>IN-expression pushdown for Equality Deletes:</b> build an
Iceberg {@code IN}
+ * expression and pass it as a Parquet residual so the metrics
row-group filter can skip
+ * non-matching row groups.
+ * </ol>
+ *
+ * <p>If Position and Equality deletes are both present, both strategies are
used to get one
+ * contiguous range. We read only that range, skipping leading and trailing
row groups that
+ * contain no deletions.
+ *
+ * <p>Note: Equality pushdown is only used when all delete files share a
single equality field.
+ * Multi-column equality requires an exploded OR expression that Parquet's
metrics filter handles
+ * poorly.
+ */
+ private static CloseableIterable<Record> deletedRowsForTask(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema outputSchema) {
+ String dataFilePath = task.getDataFile().getPath();
+ List<SerializableDeleteFile> addedDeletes = task.getAddedDeletes();
+
+ // Split into position vs equality.
+ List<DeleteFile> posFiles = new ArrayList<>();
+ List<DeleteFile> eqFiles = new ArrayList<>();
+ for (SerializableDeleteFile sd : addedDeletes) {
+ DeleteFile df = sd.createDeleteFile(table.specs(), table.sortOrders());
+ if (df.content() == FileContent.POSITION_DELETES) {
+ posFiles.add(df);
+ } else if (df.content() == FileContent.EQUALITY_DELETES) {
+ eqFiles.add(df);
+ }
+ }
+
+ // Strategy 1: byte-range pushdown around row groups with position deletes
(+ eq
+ // matches).
+ DeleteReader.PreloadedDeletes preloadedDeletes =
DeleteReader.PreloadedDeletes.empty();
+ if (!posFiles.isEmpty()) {
+ @Nullable
+ PositionPushdownResult pushdown =
+ tryPositionByteRangePushdown(
+ task, table, scanConfig, outputSchema, posFiles, eqFiles,
addedDeletes);
+ if (pushdown != null) {
+ if (pushdown.deletedRecords != null) {
+ return pushdown.deletedRecords;
+ }
+ preloadedDeletes = pushdown.preloadedDeletes;
+ }
+ // fall through to the default chain on failure
+ }
+
+ // Strategy 2: equality IN-expression pushdown applied as a reader
residual.
+ // Only safe when no position deletes are present. when both exist, the
+ // byte-range path above already incorporates the eq filter
+ Expression eqResidual = Expressions.alwaysTrue();
+ if (posFiles.isEmpty() && !eqFiles.isEmpty()) {
+ EqualityPushdownResult eqPushdown = buildEqualityDeletePushdown(table,
eqFiles);
+ eqResidual = eqPushdown.applicable ? eqPushdown.residual :
Expressions.alwaysTrue();
+ preloadedDeletes = eqPushdown.preloadedDeletes(null);
+ }
+
+ DeleteFilter<Record> existingDeletesFilter =
+ genericDeleteFilter(table, outputSchema, dataFilePath,
task.getExistingDeletes());
+ DeleteReader<Record> addedDeletesReader =
+ genericDeleteReader(table, outputSchema, dataFilePath, addedDeletes,
preloadedDeletes);
+ Schema requiredSchema =
+ TypeUtil.join(existingDeletesFilter.requiredSchema(),
addedDeletesReader.requiredSchema());
+
+ CloseableIterable<Record> records =
+ createReader(task, table, scanConfig, requiredSchema, eqResidual);
+ CloseableIterable<Record> liveRecords =
existingDeletesFilter.filter(records);
+ return addedDeletesReader.read(liveRecords);
+ }
+
+ /**
+ * Path-A byte-range position-delete pushdown. Returns {@code null} if
pushdown isn't applicable
+ * or any step fails, signaling to the caller to fall back. Returns an empty
iterable if every row
+ * group is pruned.
+ */
+ private static @Nullable PositionPushdownResult tryPositionByteRangePushdown(
+ SerializableChangelogTask task,
+ Table table,
+ IcebergScanConfig scanConfig,
+ Schema outputSchema,
+ List<DeleteFile> posFiles,
+ List<DeleteFile> eqFiles,
+ List<SerializableDeleteFile> addedDeletes) {
+ String dataFilePath = task.getDataFile().getPath();
+
+ // 1. pre-load the position index for this data file.
+ PositionDeleteIndex posIndex;
+ try {
+ DeleteLoader loader = new BaseDeleteLoader(df ->
table.io().newInputFile(df.location()));
+ posIndex = loader.loadPositionDeletes(posFiles, dataFilePath);
+ } catch (RuntimeException e) {
+ LOG.info(
+ "Failed to pre-load position deletes for {}; falling back to default
reader chain.",
+ dataFilePath,
+ e);
+ return null;
+ }
+ if (posIndex.isEmpty()) {
+ // the pos-delete files don't actually target this data file (rare but
possible
+ // after metadata operations). Fall back so the eq pushdown does not run
here either.
+ return PositionPushdownResult.fallback(
+ DeleteReader.PreloadedDeletes.of(posIndex, Collections.emptyMap()));
+ }
+
+ // 2. optional equality filter (used to extend the byte range to include
row groups
+ // whose stats match the equality IN values).
+ @Nullable ParquetMetricsRowGroupFilter eqFilter = null;
+ EqualityPushdownResult eqPushdown = EqualityPushdownResult.notApplicable();
+ if (!eqFiles.isEmpty()) {
+ eqPushdown = buildEqualityDeletePushdown(table, eqFiles);
+ if (!eqPushdown.applicable) {
+ // eq deletes are present but we can't safely identify which row
groups they target.
+ // A narrowed position-only range could drop eq-deleted rows, so fall
back to the
+ // default full-range reader. DeleteReader will still apply residual
per record.
+ return
PositionPushdownResult.fallback(eqPushdown.preloadedDeletes(posIndex));
+ }
+ eqFilter = new ParquetMetricsRowGroupFilter(table.schema(),
eqPushdown.residual);
+ }
+ DeleteReader.PreloadedDeletes preloadedDeletes =
eqPushdown.preloadedDeletes(posIndex);
+
+ // 3. read the footer and compute the task byte range covering every row
group that
+ // contains a position delete or matches the eq filter.
+ long taskStart = task.getStart();
+ long taskEnd = taskStart + task.getLength();
+ long minStart = Long.MAX_VALUE;
+ long maxEnd = Long.MIN_VALUE;
+ long[] sortedDeletePositions = sortedDeletePositions(posIndex);
+
+ try {
+ InputFile inputFile = table.io().newInputFile(dataFilePath);
+ try (ParquetFileReader reader =
ParquetFileReader.open(asParquetInputFile(inputFile))) {
+ ParquetMetadata footer = reader.getFooter();
+ MessageType parquetSchema = footer.getFileMetaData().getSchema();
+
+ // track cumulative row count ourselves. not all Parquet writers will
include
+ // it in BlockMetaData.getRowIndexOffset
+ long cumulativeRows = 0;
+ for (BlockMetaData rowGroup : footer.getBlocks()) {
+ long rgStartPos = cumulativeRows;
+ long rgEndPos = cumulativeRows + rowGroup.getRowCount();
+ cumulativeRows = rgEndPos;
+
+ long rgByteStart = rowGroup.getStartingPos();
+ long rgByteEnd = rgByteStart + rowGroup.getCompressedSize();
+
+ // skip row groups outside this task's range.
+ if (rgByteEnd <= taskStart || rgByteStart >= taskEnd) {
+ continue;
+ }
+
+ // if row group has a position and/or an equality delete, include it
in the global range
+ boolean rowGroupHasPosDelete = anyInRange(sortedDeletePositions,
rgStartPos, rgEndPos);
+ boolean rowGroupMatchesEq =
+ eqFilter != null && eqFilter.shouldRead(parquetSchema, rowGroup);
+
+ if (rowGroupHasPosDelete || rowGroupMatchesEq) {
+ minStart = Math.min(minStart, rgByteStart);
+ maxEnd = Math.max(maxEnd, rgByteEnd);
+ }
+ }
+ }
+ } catch (IOException | RuntimeException e) {
+ LOG.info(
+ "Failed to read Parquet footer for {}; falling back to default
reader chain.",
+ dataFilePath,
+ e);
+ return PositionPushdownResult.fallback(preloadedDeletes);
+ }
+
+ long readStart = Math.max(minStart, taskStart);
+ long readEnd = Math.min(maxEnd, taskEnd);
+ if (readStart >= readEnd) {
+ // deletes don't target the portion of the DataFile covered by this read
task.
+ return PositionPushdownResult.of(CloseableIterable.empty(),
preloadedDeletes);
+ }
+
+ // 4. Open the reader with the narrowed byte range. This range represents
the union
+ // of "has position delete" + "matches eq stats"
+ DeleteFilter<Record> existingDeletesFilter =
+ genericDeleteFilter(table, outputSchema, dataFilePath,
task.getExistingDeletes());
+ DeleteReader<Record> addedDeletesReader =
+ genericDeleteReader(table, outputSchema, dataFilePath, addedDeletes,
preloadedDeletes);
+ Schema requiredSchema =
+ TypeUtil.join(existingDeletesFilter.requiredSchema(),
addedDeletesReader.requiredSchema());
+ CloseableIterable<Record> records =
+ createReader(
+ task,
+ table,
+ scanConfig,
+ requiredSchema,
+ Expressions.alwaysTrue(),
+ readStart,
+ readEnd - readStart);
+ CloseableIterable<Record> liveRecords =
existingDeletesFilter.filter(records);
+ return PositionPushdownResult.of(addedDeletesReader.read(liveRecords),
preloadedDeletes);
+ }
+
+ /** Materializes a sorted long[] of the positions in {@code posIndex} for
binary-search lookup. */
+ private static long[] sortedDeletePositions(PositionDeleteIndex posIndex) {
+ long cardinality = posIndex.cardinality();
+ if (cardinality > Integer.MAX_VALUE) {
+ throw new IllegalStateException(
+ "Position delete index cardinality exceeds Integer.MAX_VALUE: " +
cardinality);
+ }
+ long[] arr = new long[(int) cardinality];
+ int[] idx = {0};
+ posIndex.forEach(p -> arr[idx[0]++] = p);
+ // forEach is ordered for the bitmap-backed implementation, but the
interface doesn't
+ // promise it, so sort defensively. Cheap relative to the I/O it gates.
+ Arrays.sort(arr);
+ return arr;
+ }
+
+ /** Returns true iff {@code sortedDeletes} contains any value in {@code
[start, end)}. */
+ private static boolean anyInRange(long[] sortedDeletes, long startInclusive,
long endExclusive) {
+ if (sortedDeletes.length == 0) {
+ return false;
+ }
+ int i = Arrays.binarySearch(sortedDeletes, startInclusive);
+ if (i < 0) {
+ i = -i - 1; // insertion point
+ }
+ return i < sortedDeletes.length && sortedDeletes[i] < endExclusive;
+ }
+
+ /**
+ * Returns an {@code IN} expression suitable as a Parquet residual for the
given equality-delete
+ * files, or {@link Expressions#alwaysTrue()} if pushdown is not applicable.
See {@link
+ * #deletedRowsForTask} for the applicability rules.
+ */
+ private static EqualityPushdownResult buildEqualityDeletePushdown(
+ Table table, List<DeleteFile> eqFiles) {
+ // All eq delete files in this task must share a single equality field id.
+ Set<Integer> sharedIds = null;
+ for (DeleteFile df : eqFiles) {
+ Set<Integer> ids = new HashSet<>(df.equalityFieldIds());
+ if (sharedIds == null) {
+ sharedIds = ids;
+ } else if (!sharedIds.equals(ids)) {
+ return EqualityPushdownResult.notApplicable();
+ }
+ }
+ if (sharedIds == null || sharedIds.size() != 1) {
+ return EqualityPushdownResult.notApplicable();
+ }
+
+ int fieldId = Iterables.getOnlyElement(sharedIds);
+ Types.NestedField field = table.schema().findField(fieldId);
+ if (field == null) {
+ return EqualityPushdownResult.notApplicable();
+ }
+ Schema deleteSchema = TypeUtil.select(table.schema(), sharedIds);
+
+ DeleteLoader loader = new BaseDeleteLoader(df ->
table.io().newInputFile(df.location()));
+ StructLikeSet set;
+ try {
+ set = loader.loadEqualityDeletes(eqFiles, deleteSchema);
+ } catch (RuntimeException e) {
+ LOG.info(
+ "Failed to pre-load equality deletes for pushdown; falling back to
per-record check.", e);
+ return EqualityPushdownResult.notApplicable();
+ }
+
+ Map<Set<Integer>, StructLikeSet> preloadedSets = new HashMap<>();
+ preloadedSets.put(sharedIds, set);
+
+ if (set.size() > IN_PREDICATE_LIMIT) {
+ return EqualityPushdownResult.notApplicable(preloadedSets);
+ }
+ Class<?> javaClass = field.type().typeId().javaClass();
+ List<Object> values = new ArrayList<>(set.size());
+ for (StructLike s : set) {
+ @Nullable Object v = s.get(0, javaClass);
+ if (v == null) {
+ // Nulls don't match an IN-expression. pushing down would drop those
deletions.
+ return EqualityPushdownResult.notApplicable(preloadedSets);
+ }
+ values.add(v);
+ }
+ if (values.isEmpty()) {
+ return EqualityPushdownResult.notApplicable(preloadedSets);
+ }
+ return EqualityPushdownResult.applicable(Expressions.in(field.name(),
values), preloadedSets);
+ }
+
+ private static final class PositionPushdownResult {
+ private final @Nullable CloseableIterable<Record> deletedRecords;
+ private final DeleteReader.PreloadedDeletes preloadedDeletes;
+
+ private static PositionPushdownResult of(
+ CloseableIterable<Record> deletedRecords,
DeleteReader.PreloadedDeletes preloadedDeletes) {
+ return new PositionPushdownResult(deletedRecords, preloadedDeletes);
+ }
+
+ private static PositionPushdownResult
fallback(DeleteReader.PreloadedDeletes preloadedDeletes) {
+ return new PositionPushdownResult(null, preloadedDeletes);
+ }
+
+ private PositionPushdownResult(
+ @Nullable CloseableIterable<Record> records,
+ DeleteReader.PreloadedDeletes preloadedDeletes) {
+ this.deletedRecords = records;
+ this.preloadedDeletes = preloadedDeletes;
+ }
+ }
+
+ private static final class EqualityPushdownResult {
+ private static final EqualityPushdownResult NOT_APPLICABLE =
+ new EqualityPushdownResult(Expressions.alwaysTrue(),
Collections.emptyMap(), false);
+
+ private final Expression residual;
+ private final Map<Set<Integer>, StructLikeSet> preloadedSets;
+ private final boolean applicable;
+
+ private static EqualityPushdownResult applicable(
+ Expression residual, Map<Set<Integer>, StructLikeSet> preloadedSets) {
+ return new EqualityPushdownResult(residual, preloadedSets, true);
+ }
+
+ private static EqualityPushdownResult notApplicable() {
+ return NOT_APPLICABLE;
+ }
+
+ private static EqualityPushdownResult notApplicable(
+ Map<Set<Integer>, StructLikeSet> preloadedSets) {
+ if (preloadedSets.isEmpty()) {
+ return NOT_APPLICABLE;
+ }
+ return new EqualityPushdownResult(Expressions.alwaysTrue(),
preloadedSets, false);
+ }
+
+ private EqualityPushdownResult(
+ Expression residual, Map<Set<Integer>, StructLikeSet> preloadedSets,
boolean applicable) {
+ this.residual = residual;
+ this.preloadedSets = preloadedSets;
+ this.applicable = applicable;
+ }
+
+ private DeleteReader.PreloadedDeletes preloadedDeletes(
+ @Nullable PositionDeleteIndex positionDeleteIndex) {
+ return DeleteReader.PreloadedDeletes.of(positionDeleteIndex,
preloadedSets);
+ }
+ }
+
+ public static class GenericDeleteFilter extends DeleteFilter<Record> {
+ private final FileIO io;
+ private final InternalRecordWrapper asStructLike;
+
+ @SuppressWarnings("method.invocation")
+ public GenericDeleteFilter(
+ FileIO io,
+ String dataFilePath,
+ Schema tableSchema,
+ Schema requiredSchema,
+ List<DeleteFile> deleteFiles) {
+ super(dataFilePath, deleteFiles, tableSchema, requiredSchema);
+ this.io = io;
+ this.asStructLike = new
InternalRecordWrapper(requiredSchema().asStruct());
+ }
+
+ @Override
+ protected StructLike asStructLike(Record record) {
+ return asStructLike.wrap(record);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return io.newInputFile(location);
+ }
+ }
+
+ public static class GenericDeleteReader extends DeleteReader<Record> {
+ private final FileIO io;
+ private final InternalRecordWrapper asStructLike;
+
+ @SuppressWarnings("method.invocation")
+ public GenericDeleteReader(
+ FileIO io,
+ String dataFilePath,
+ Schema tableSchema,
+ Schema requiredSchema,
+ List<DeleteFile> deleteFiles,
+ DeleteReader.PreloadedDeletes preloadedDeletes) {
+ super(dataFilePath, deleteFiles, tableSchema, requiredSchema, true,
preloadedDeletes);
+ this.io = io;
+ this.asStructLike = new
InternalRecordWrapper(requiredSchema().asStruct());
+ }
+
+ @Override
+ protected StructLike asStructLike(Record record) {
+ return asStructLike.wrap(record);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return io.newInputFile(location);
+ }
+ }
+
+ /**
+ * Adapter from Iceberg's {@link InputFile} to Parquet's {@link
org.apache.parquet.io.InputFile},
+ * for callers that need to open a Parquet file directly (e.g. to read the
footer for row-group
+ * pruning decisions). Iceberg has an equivalent internal {@code ParquetIO}
but it's
+ * package-private.
+ */
+ public static org.apache.parquet.io.InputFile asParquetInputFile(InputFile
icebergFile) {
+ return new IcebergParquetInputFile(icebergFile);
+ }
+
+ private static final class IcebergParquetInputFile implements
org.apache.parquet.io.InputFile {
+ private final InputFile delegate;
+
+ IcebergParquetInputFile(InputFile delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public long getLength() {
+ return delegate.getLength();
+ }
+
+ @Override
+ public org.apache.parquet.io.SeekableInputStream newStream() {
+ return new IcebergParquetSeekableStream(delegate.newStream());
+ }
+ }
+
+ private static final class IcebergParquetSeekableStream extends
DelegatingSeekableInputStream {
+ private final SeekableInputStream delegate;
+
+ IcebergParquetSeekableStream(SeekableInputStream delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public long getPos() throws java.io.IOException {
+ return delegate.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws java.io.IOException {
+ delegate.seek(newPos);
+ }
+ }
Review Comment:

Extending `DelegatingSeekableInputStream` is unsafe here because its methods
(such as `readFully`) cast the delegate stream to
`org.apache.parquet.io.SeekableInputStream`. Since
`org.apache.iceberg.io.SeekableInputStream` does not implement the Parquet
interface, this will throw a `ClassCastException` at runtime when the Parquet
reader attempts to read the file footer. To fix this,
`IcebergParquetSeekableStream` should extend
`org.apache.parquet.io.SeekableInputStream` directly and delegate the standard
`InputStream` methods to the underlying stream.
```java
private static final class IcebergParquetSeekableStream extends
org.apache.parquet.io.SeekableInputStream {
private final SeekableInputStream delegate;
IcebergParquetSeekableStream(SeekableInputStream delegate) {
this.delegate = delegate;
}
@Override
public long getPos() throws java.io.IOException {
return delegate.getPos();
}
@Override
public void seek(long newPos) throws java.io.IOException {
delegate.seek(newPos);
}
@Override
public int read() throws java.io.IOException {
return delegate.read();
}
@Override
public int read(byte[] b, int off, int len) throws java.io.IOException {
return delegate.read(b, off, len);
}
@Override
public void close() throws java.io.IOException {
delegate.close();
}
}
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/CdcOutputUtils.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.types.Types;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Helpers for CDC schemas and output row construction.
+ *
+ * <p>CDC metadata is handled in two phases. Row metadata, such as {@code
_row_id} and {@code
+ * _last_updated_sequence_number}, is added to intermediate read schemas so
Iceberg readers can
+ * populate those values. Commit metadata, such as {@code _commit_snapshot_id}
and {@code
+ * _commit_snapshot_sequence_number}, is carried separately in CDC descriptors.
+ *
+ * <p>The public output shape is assembled only when final Beam {@link Row}s
are emitted. This keeps
+ * the read path table-shaped while still exposing all requested metadata as
top-level output
+ * fields.
+ */
+final class CdcOutputUtils {
+ /**
+ * Returns the public CDC output schema: projected data fields followed by
requested metadata
+ * columns in user-configured order.
+ */
+ static Schema outputSchema(List<String> metadataColumns, Schema dataSchema) {
+ if (metadataColumns.isEmpty()) {
+ return dataSchema;
+ }
+
+ Schema.Builder builder =
Schema.builder().addFields(dataSchema.getFields());
+ for (String metadataColumn : metadataColumns) {
+ builder.addField(IcebergCdcMetadataColumns.beamField(metadataColumn));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns an Iceberg read schema that includes row metadata columns.
+ *
+ * <p>Commit metadata columns are not added here because Iceberg readers
cannot populate them;
+ * those values are taken from {@link ChangelogDescriptor} or {@link
CdcRowDescriptor} when output
+ * rows are built.
+ */
+ static org.apache.iceberg.Schema readSchemaWithRowMetadata(
+ List<String> metadataColumns, org.apache.iceberg.Schema dataSchema) {
+ List<Types.NestedField> fields = new ArrayList<>(dataSchema.columns());
+ for (String metadataColumn : metadataColumns) {
+ Types.NestedField rowMetadataField =
+ IcebergCdcMetadataColumns.icebergRowMetadataField(metadataColumn);
+ if (rowMetadataField != null &&
dataSchema.findField(rowMetadataField.fieldId()) == null) {
+ fields.add(rowMetadataField);
+ }
+ }
+ return new org.apache.iceberg.Schema(fields,
dataSchema.identifierFieldIds());
+ }
+
+ /**
+ * Beam-schema equivalent of {@link #readSchemaWithRowMetadata(List,
org.apache.iceberg.Schema)}.
+ */
+ static Schema readBeamSchemaWithRowMetadata(List<String> metadataColumns,
Schema dataSchema) {
+ if
(metadataColumns.stream().noneMatch(IcebergCdcMetadataColumns::isRowMetadataColumn))
{
+ return dataSchema;
+ }
+
+ Schema.Builder builder =
Schema.builder().addFields(dataSchema.getFields());
+ for (String metadataColumn : metadataColumns) {
+ if (IcebergCdcMetadataColumns.isRowMetadataColumn(metadataColumn)
+ && !dataSchema.hasField(metadataColumn)) {
+ builder.addField(IcebergCdcMetadataColumns.beamField(metadataColumn));
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Builds the final public Beam row.
+ *
+ * <p>{@code dataAndRowMetadata} may already include row metadata read from
Iceberg. This method
+ * copies only data fields first, then appends every requested metadata
column at the top level.
+ * That preserves configured column order and avoids exposing row metadata
twice.
+ */
+ static Row outputRow(
+ List<String> metadataColumns,
+ Schema outputSchema,
+ long commitSnapshotId,
+ long snapshotSequentNumber,
+ Row dataAndRowMetadata) {
+ if (metadataColumns.isEmpty()
+ ||
metadataColumns.stream().allMatch(IcebergCdcMetadataColumns::isRowMetadataColumn))
{
+ return dataAndRowMetadata;
+ }
+
+ List<@Nullable Object> values = new
ArrayList<>(outputSchema.getFieldCount());
+ for (Schema.Field field : dataAndRowMetadata.getSchema().getFields()) {
+ if (!metadataColumns.contains(field.getName())) {
+ values.add(dataAndRowMetadata.getValue(field.getName()));
+ }
+ }
+
+ for (String metadataColumn : metadataColumns) {
+ values.add(
+ metadataValue(
+ metadataColumn, commitSnapshotId, snapshotSequentNumber,
dataAndRowMetadata));
+ }
+ return Row.withSchema(outputSchema).addValues(values).build();
+ }
Review Comment:

There is a typo in the parameter name `snapshotSequentNumber`. It should be
renamed to `snapshotSequenceNumber` for correctness and consistency with other
methods.
```java
static Row outputRow(
List<String> metadataColumns,
Schema outputSchema,
long commitSnapshotId,
long snapshotSequenceNumber,
Row dataAndRowMetadata) {
if (metadataColumns.isEmpty()
||
metadataColumns.stream().allMatch(IcebergCdcMetadataColumns::isRowMetadataColumn))
{
return dataAndRowMetadata;
}
List<@Nullable Object> values = new
ArrayList<>(outputSchema.getFieldCount());
for (Schema.Field field : dataAndRowMetadata.getSchema().getFields()) {
if (!metadataColumns.contains(field.getName())) {
values.add(dataAndRowMetadata.getValue(field.getName()));
}
}
for (String metadataColumn : metadataColumns) {
values.add(
metadataValue(
metadataColumn, commitSnapshotId, snapshotSequenceNumber,
dataAndRowMetadata));
}
return Row.withSchema(outputSchema).addValues(values).build();
}
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDeleteFile.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static
org.apache.beam.sdk.io.iceberg.SerializableDataFile.computeMapByteHashCode;
+import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.mapEquals;
+import static
org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteArrayMap;
+import static
org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteBufferMap;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SortOrder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SerializableDeleteFile {
+ public static SerializableDeleteFile.Builder builder() {
+ return new AutoValue_SerializableDeleteFile.Builder();
+ }
+
+ @SchemaFieldNumber("0")
+ public abstract FileContent getContentType();
+
+ @SchemaFieldNumber("1")
+ public abstract String getLocation();
+
+ @SchemaFieldNumber("2")
+ public abstract String getFileFormat();
+
+ @SchemaFieldNumber("3")
+ public abstract long getRecordCount();
+
+ @SchemaFieldNumber("4")
+ public abstract long getFileSizeInBytes();
+
+ @SchemaFieldNumber("5")
+ public abstract String getPartitionPath();
+
+ @SchemaFieldNumber("6")
+ public abstract int getPartitionSpecId();
+
+ @SchemaFieldNumber("7")
+ public abstract @Nullable Integer getSortOrderId();
+
+ @SchemaFieldNumber("8")
+ public abstract @Nullable List<Integer> getEqualityFieldIds();
+
+ @SchemaFieldNumber("9")
+ public abstract @Nullable ByteBuffer getKeyMetadata();
+
+ @SchemaFieldNumber("10")
+ public abstract @Nullable List<Long> getSplitOffsets();
+
+ @SchemaFieldNumber("11")
+ public abstract @Nullable Map<Integer, Long> getColumnSizes();
+
+ @SchemaFieldNumber("12")
+ public abstract @Nullable Map<Integer, Long> getValueCounts();
+
+ @SchemaFieldNumber("13")
+ public abstract @Nullable Map<Integer, Long> getNullValueCounts();
+
+ @SchemaFieldNumber("14")
+ public abstract @Nullable Map<Integer, Long> getNanValueCounts();
+
+ @SchemaFieldNumber("15")
+ public abstract @Nullable Map<Integer, byte[]> getLowerBounds();
+
+ @SchemaFieldNumber("16")
+ public abstract @Nullable Map<Integer, byte[]> getUpperBounds();
+
+ @SchemaFieldNumber("17")
+ public abstract @Nullable Long getContentOffset();
+
+ @SchemaFieldNumber("18")
+ public abstract @Nullable Long getContentSizeInBytes();
+
+ @SchemaFieldNumber("19")
+ public abstract @Nullable String getReferencedDataFile();
+
+ @SchemaFieldNumber("20")
+ public abstract @Nullable Long getDataSequenceNumber();
+
+ @SchemaFieldNumber("21")
+ public abstract @Nullable Long getFileSequenceNumber();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setContentType(FileContent content);
+
+ abstract Builder setLocation(String path);
+
+ abstract Builder setFileFormat(String fileFormat);
+
+ abstract Builder setRecordCount(long recordCount);
+
+ abstract Builder setFileSizeInBytes(long fileSizeInBytes);
+
+ abstract Builder setPartitionPath(String partitionPath);
+
+ abstract Builder setPartitionSpecId(int partitionSpec);
+
+ abstract Builder setSortOrderId(@Nullable Integer sortOrderId);
+
+ abstract Builder setEqualityFieldIds(List<Integer> equalityFieldIds);
+
+ abstract Builder setKeyMetadata(ByteBuffer keyMetadata);
+
+ abstract Builder setSplitOffsets(List<Long> splitOffsets);
+
+ abstract Builder setColumnSizes(Map<Integer, Long> columnSizes);
+
+ abstract Builder setValueCounts(Map<Integer, Long> valueCounts);
+
+ abstract Builder setNullValueCounts(Map<Integer, Long> nullValueCounts);
+
+ abstract Builder setNanValueCounts(Map<Integer, Long> nanValueCounts);
+
+ abstract Builder setLowerBounds(@Nullable Map<Integer, byte[]>
lowerBounds);
+
+ abstract Builder setUpperBounds(@Nullable Map<Integer, byte[]>
upperBounds);
+
+ abstract Builder setContentOffset(@Nullable Long offset);
+
+ abstract Builder setContentSizeInBytes(@Nullable Long sizeInBytes);
+
+ abstract Builder setReferencedDataFile(@Nullable String dataFile);
+
+ abstract Builder setDataSequenceNumber(@Nullable Long number);
+
+ abstract Builder setFileSequenceNumber(@Nullable Long number);
+
+ abstract SerializableDeleteFile build();
+ }
+
+ public static SerializableDeleteFile from(
+ DeleteFile deleteFile, String partitionPath, boolean includeMetrics) {
+
+ SerializableDeleteFile.Builder builder =
+ SerializableDeleteFile.builder()
+ .setLocation(deleteFile.location())
+ .setFileFormat(deleteFile.format().name())
+ .setFileSizeInBytes(deleteFile.fileSizeInBytes())
+ .setPartitionPath(partitionPath)
+ .setPartitionSpecId(deleteFile.specId())
+ .setRecordCount(deleteFile.recordCount())
+ .setColumnSizes(deleteFile.columnSizes())
+ .setValueCounts(deleteFile.valueCounts())
+ .setNullValueCounts(deleteFile.nullValueCounts())
+ .setNanValueCounts(deleteFile.nanValueCounts())
+ .setSplitOffsets(deleteFile.splitOffsets())
+ .setKeyMetadata(deleteFile.keyMetadata())
+ .setEqualityFieldIds(deleteFile.equalityFieldIds())
+ .setSortOrderId(deleteFile.sortOrderId())
+ .setContentOffset(deleteFile.contentOffset())
+ .setContentSizeInBytes(deleteFile.contentSizeInBytes())
+ .setReferencedDataFile(deleteFile.referencedDataFile())
+ .setContentType(deleteFile.content())
+ .setDataSequenceNumber(deleteFile.dataSequenceNumber())
+ .setFileSequenceNumber(deleteFile.fileSequenceNumber());
+
+ if (includeMetrics) {
+ builder =
+ builder
+ .setLowerBounds(toByteArrayMap(deleteFile.lowerBounds()))
+ .setUpperBounds(toByteArrayMap(deleteFile.upperBounds()));
+ }
+
+ return builder.build();
+ }
+
+ @SuppressWarnings("nullness")
+ public DeleteFile createDeleteFile(
+ Map<Integer, PartitionSpec> partitionSpecs, @Nullable Map<Integer,
SortOrder> sortOrders) {
+ PartitionSpec partitionSpec =
+ checkStateNotNull(
+ partitionSpecs.get(getPartitionSpecId()),
+ "This DeleteFile was originally created with spec id '%s', "
+ + "but table only has spec ids: %s.",
+ getPartitionSpecId(),
+ partitionSpecs.keySet());
+
+ Metrics metrics =
+ new Metrics(
+ getRecordCount(),
+ getColumnSizes(),
+ getValueCounts(),
+ getNullValueCounts(),
+ getNanValueCounts(),
+ toByteBufferMap(getLowerBounds()),
+ toByteBufferMap(getUpperBounds()));
+
+ FileMetadata.Builder deleteFileBuilder =
+ FileMetadata.deleteFileBuilder(partitionSpec)
+ .withPath(getLocation())
+ .withFormat(getFileFormat())
+ .withFileSizeInBytes(getFileSizeInBytes())
+ .withRecordCount(getRecordCount())
+ .withMetrics(metrics)
+ .withSplitOffsets(getSplitOffsets())
+ .withEncryptionKeyMetadata(getKeyMetadata())
+ .withPartitionPath(getPartitionPath());
+
+ switch (getContentType()) {
+ case POSITION_DELETES:
+ deleteFileBuilder = deleteFileBuilder.ofPositionDeletes();
+ break;
+ case EQUALITY_DELETES:
+ int[] equalityFieldIds =
+ Objects.requireNonNullElse(getEqualityFieldIds(), new
ArrayList<Integer>()).stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
Review Comment:

Using streams and creating a new `ArrayList` on every invocation is
inefficient and adds unnecessary GC pressure in a potentially hot path. We can
optimize this by checking for null and performing a simple loop to populate the
primitive array, which avoids boxing/unboxing and stream overhead.
```java
List<Integer> fieldIds = getEqualityFieldIds();
int[] equalityFieldIds = new int[fieldIds != null ? fieldIds.size()
: 0];
if (fieldIds != null) {
for (int i = 0; i < fieldIds.size(); i++) {
equalityFieldIds[i] = fieldIds.get(i);
}
}
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java:
##########
@@ -364,13 +375,29 @@ void validate(Table table) {
if (getStartingStrategy() != null) {
invalidOptions.add("starting_strategy");
}
+ if (!getMetadataColumns().isEmpty()) {
+ invalidOptions.add("metadata_columns");
+ }
if (!invalidOptions.isEmpty()) {
throw new IllegalArgumentException(
error(
"the following options are currently only available when "
+ "reading with Managed.ICEBERG_CDC: "
+ invalidOptions));
}
+ } else {
+ Set<Integer> primaryKeyIds = new
HashSet<>(table.schema().identifierFieldIds());
+ checkState(
+ !primaryKeyIds.isEmpty(),
+ "Cannot read CDC records as the table schema does not specified any
primary key fields.");
Review Comment:

There is a grammatical typo in the error message: 'does not specified'
should be 'does not specify'.
```suggestion
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specify any
primary key fields.");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]