scwhittle commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r2003171306


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * Bounded read implementation.
+ *
+ * <p>For each {@link ReadTask}, reads Iceberg {@link Record}s, and converts 
to Beam {@link Row}s.
+ *
+ * <p>Implemented as an SDF to leverage communicating bundle size (i.e. {@link 
DoFn.GetSize}) to the
+ * runner, to help with scaling decisions.
+ */
+@DoFn.BoundedPerElement
+class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
+  private final IcebergScanConfig scanConfig;
+  private final Counter scanTasksCompleted =
+      Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");
+
+  ReadFromTasks(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<ReadTaskDescriptor, ReadTask> element,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      OutputReceiver<Row> out)
+      throws IOException, ExecutionException, InterruptedException {
+    if (!tracker.tryClaim(0L)) {

Review Comment:
   should this method return a ProcessContinuation if it is an SDF? Or maybe it 
just defaults to stop if return type is void?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans the given snapshot and creates multiple {@link ReadTask}s. Each task 
represents a portion
+ * of a data file that was appended within the snapshot range.
+ */
+class CreateReadTasksDoFn
+    extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, 
ReadTask>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CreateReadTasksDoFn.class);
+  private static final Counter totalScanTasks =
+      Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks");
+  private final IcebergScanConfig scanConfig;
+
+  CreateReadTasksDoFn(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, List<SnapshotInfo>> element,
+      OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
+      throws IOException, ExecutionException {
+    Table table =
+        TableCache.getRefreshed(element.getKey(), 
scanConfig.getCatalogConfig().catalog());
+
+    // scan snapshots individually and assign commit timestamp to files
+    for (SnapshotInfo snapshot : element.getValue()) {
+      @Nullable Long fromSnapshot = snapshot.getParentId();
+      long toSnapshot = snapshot.getSnapshotId();
+
+      if (!DataOperations.APPEND.equals(snapshot.getOperation())) {
+        LOG.info(
+            "Skipping non-append snapshot of operation '{}'. Sequence number: 
{}, id: {}",
+            snapshot.getOperation(),
+            snapshot.getSequenceNumber(),
+            snapshot.getSnapshotId());
+      }
+
+      LOG.info("Planning to scan snapshot {}", toSnapshot);
+      IncrementalAppendScan scan = 
table.newIncrementalAppendScan().toSnapshot(toSnapshot);
+      if (fromSnapshot != null) {
+        scan = scan.fromSnapshotExclusive(fromSnapshot);
+      }
+
+      createAndOutputReadTasks(scan, snapshot, out);
+    }
+  }
+
+  private void createAndOutputReadTasks(
+      IncrementalAppendScan scan,
+      SnapshotInfo snapshot,
+      OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
+      throws IOException {
+    int numTasks = 0;
+    try (CloseableIterable<CombinedScanTask> combinedScanTasks = 
scan.planTasks()) {
+      for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+        // A single DataFile can be broken up into multiple FileScanTasks
+        for (FileScanTask fileScanTask : combinedScanTask.tasks()) {
+          ReadTask task =
+              ReadTask.builder()
+                  .setFileScanTaskJson(ScanTaskParser.toJson(fileScanTask))
+                  .setByteSize(fileScanTask.length())
+                  .setOperation(snapshot.getOperation())
+                  .setSnapshotTimestampMillis(snapshot.getTimestampMillis())
+                  .build();
+          ReadTaskDescriptor descriptor =
+              ReadTaskDescriptor.builder()
+                  
.setTableIdentifierString(checkStateNotNull(snapshot.getTableIdentifierString()))
+                  .build();
+
+          out.outputWithTimestamp(
+              KV.of(descriptor, task), 
Instant.ofEpochMilli(snapshot.getTimestampMillis()));
+          totalScanTasks.inc();

Review Comment:
   could just inc this by numTasks outside loop



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.iceberg.util.SnapshotUtil.ancestorsOf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.ParquetReader;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Helper class for source operations. */
+public class ReadUtils {
+  // default is 8MB. keep this low to avoid overwhelming memory
+  static final int MAX_FILE_BUFFER_SIZE = 1 << 18; // 256KB

Review Comment:
   might want this to be configurble or pipeline option.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WatchForSnapshots.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps watch over an Iceberg table and continuously outputs a range of 
snapshots, at the specified
+ * interval.
+ *
+ * <p>A downstream transform will create a list of read tasks for each range.
+ */
+class WatchForSnapshots extends PTransform<PBegin, PCollection<KV<String, 
List<SnapshotInfo>>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WatchForSnapshots.class);
+  private final Duration pollInterval;
+  private final IcebergScanConfig scanConfig;
+
+  WatchForSnapshots(IcebergScanConfig scanConfig, Duration pollInterval) {
+    this.pollInterval = pollInterval;
+    this.scanConfig = scanConfig;
+  }
+
+  @Override
+  public PCollection<KV<String, List<SnapshotInfo>>> expand(PBegin input) {
+    return input
+        .apply(Create.of(scanConfig.getTableIdentifier()))
+        .apply(
+            "Scan Table Snapshots",
+            Watch.growthOf(new SnapshotPollFn(scanConfig))
+                .withPollInterval(pollInterval)
+                .withOutputCoder(ListCoder.of(SnapshotInfo.getCoder())))
+        .apply("Persist Snapshot Progress", ParDo.of(new 
PersistSnapshotProgress()));
+  }
+
+  /**
+   * Periodically scans the table for new snapshots, emitting a list for each 
new snapshot range.
+   *
+   * <p>This tracks progress locally but is not resilient to retries -- upon 
worker failure, it will
+   * restart from the initial starting strategy. Resilience is handled 
downstream by {@link
+   * PersistSnapshotProgress}.
+   */
+  private static class SnapshotPollFn extends Watch.Growth.PollFn<String, 
List<SnapshotInfo>> {
+    private final IcebergScanConfig scanConfig;
+    private @Nullable Long fromSnapshotId;
+
+    SnapshotPollFn(IcebergScanConfig scanConfig) {
+      this.scanConfig = scanConfig;
+    }
+
+    @Override
+    public PollResult<List<SnapshotInfo>> apply(String tableIdentifier, 
Context c) {
+      // fetch a fresh table to catch new snapshots
+      Table table =
+          TableCache.getRefreshed(tableIdentifier, 
scanConfig.getCatalogConfig().catalog());
+
+      @Nullable Long userSpecifiedToSnapshot = ReadUtils.getToSnapshot(table, 
scanConfig);
+      boolean isComplete = userSpecifiedToSnapshot != null;
+      if (fromSnapshotId == null) {
+        // first scan, initialize starting point with user config
+        fromSnapshotId = ReadUtils.getFromSnapshotExclusive(table, scanConfig);
+      }
+
+      Snapshot currentSnapshot = table.currentSnapshot();
+      if (currentSnapshot == null || 
Objects.equal(currentSnapshot.snapshotId(), fromSnapshotId)) {
+        // no new snapshots since last poll. return empty result.
+        return getPollResult(null, isComplete);
+      }
+
+      Long currentSnapshotId = currentSnapshot.snapshotId();
+      // if no upper bound is specified, we poll up to the current snapshot
+      long toSnapshotId = MoreObjects.firstNonNull(userSpecifiedToSnapshot, 
currentSnapshotId);
+
+      List<SnapshotInfo> snapshots =
+          ReadUtils.snapshotsBetween(table, tableIdentifier, fromSnapshotId, 
toSnapshotId);
+
+      fromSnapshotId = currentSnapshotId;
+      return getPollResult(snapshots, isComplete);
+    }
+
+    private PollResult<List<SnapshotInfo>> getPollResult(
+        @Nullable List<SnapshotInfo> snapshots, boolean isComplete) {
+      List<TimestampedValue<List<SnapshotInfo>>> timestampedSnapshots = new 
ArrayList<>(1);

Review Comment:
   nit: could use ImmutableList.of



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * Bounded read implementation.
+ *
+ * <p>For each {@link ReadTask}, reads Iceberg {@link Record}s, and converts 
to Beam {@link Row}s.
+ *
+ * <p>Implemented as an SDF to leverage communicating bundle size (i.e. {@link 
DoFn.GetSize}) to the
+ * runner, to help with scaling decisions.
+ */
+@DoFn.BoundedPerElement
+class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
+  private final IcebergScanConfig scanConfig;
+  private final Counter scanTasksCompleted =
+      Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");
+
+  ReadFromTasks(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<ReadTaskDescriptor, ReadTask> element,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      OutputReceiver<Row> out)
+      throws IOException, ExecutionException, InterruptedException {
+    if (!tracker.tryClaim(0L)) {
+      return;
+    }
+    ReadTask readTask = element.getValue();
+    Table table =
+        TableCache.get(scanConfig.getTableIdentifier(), 
scanConfig.getCatalogConfig().catalog());
+
+    FileScanTask task = readTask.getFileScanTask();
+    try (CloseableIterable<Record> reader = ReadUtils.createReader(task, 
table)) {
+      for (Record record : reader) {
+        Row row = IcebergUtils.icebergRecordToBeamRow(scanConfig.getSchema(), 
record);
+        out.output(row);
+      }
+    }
+    scanTasksCompleted.inc();
+  }
+
+  @GetSize
+  public double getSize(@Element KV<ReadTaskDescriptor, ReadTask> element) {
+    return element.getValue().getByteSize();

Review Comment:
   this may work better for autoscaling if it is consistent with the output 
bytes.  I'm guessing this byte size is more file-bytes and not necessarily the 
record bytes after conversion to beam rows. See 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimator.java
 for what was needed for Spanner change streams.
   
   Could add a TODO



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Utility to fetch and cache Iceberg {@link Table}s. */
+class TableCache {
+  private static final Cache<TableIdentifier, Table> CACHE =
+      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();

Review Comment:
   can the timeout be higher? There are cases if all threads are busy doing 
other fused stages that we might not use the cache for a while.  If cost of 
keeping table open in memory seems less than latency for reopening then maybe 
increase to an hour or so?  We did that for PRocessBundle cache for similar 
reasno.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scans the given snapshot and creates multiple {@link ReadTask}s. Each task 
represents a portion
+ * of a data file that was appended within the snapshot range.
+ */
+class CreateReadTasksDoFn
+    extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, 
ReadTask>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CreateReadTasksDoFn.class);
+  private static final Counter totalScanTasks =
+      Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks");
+  private final IcebergScanConfig scanConfig;
+
+  CreateReadTasksDoFn(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, List<SnapshotInfo>> element,
+      OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
+      throws IOException, ExecutionException {
+    Table table =
+        TableCache.getRefreshed(element.getKey(), 
scanConfig.getCatalogConfig().catalog());
+
+    // scan snapshots individually and assign commit timestamp to files
+    for (SnapshotInfo snapshot : element.getValue()) {
+      @Nullable Long fromSnapshot = snapshot.getParentId();
+      long toSnapshot = snapshot.getSnapshotId();
+
+      if (!DataOperations.APPEND.equals(snapshot.getOperation())) {
+        LOG.info(
+            "Skipping non-append snapshot of operation '{}'. Sequence number: 
{}, id: {}",
+            snapshot.getOperation(),
+            snapshot.getSequenceNumber(),
+            snapshot.getSnapshotId());
+      }
+
+      LOG.info("Planning to scan snapshot {}", toSnapshot);
+      IncrementalAppendScan scan = 
table.newIncrementalAppendScan().toSnapshot(toSnapshot);
+      if (fromSnapshot != null) {
+        scan = scan.fromSnapshotExclusive(fromSnapshot);
+      }
+
+      createAndOutputReadTasks(scan, snapshot, out);
+    }
+  }
+
+  private void createAndOutputReadTasks(
+      IncrementalAppendScan scan,
+      SnapshotInfo snapshot,
+      OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
+      throws IOException {
+    int numTasks = 0;
+    try (CloseableIterable<CombinedScanTask> combinedScanTasks = 
scan.planTasks()) {

Review Comment:
   planFiles() seems more appropriate if we are just breaking apart the tasks? 
It seems otherwise planTasks just does some bin-packing etc that we are undoing 
perhaps?
   
   If that's not the case how about commenting why it is done this way.



##########
CHANGES.md:
##########
@@ -65,6 +65,7 @@
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * [Java] Use API compatible with both com.google.cloud.bigdataoss:util 2.x and 
3.x in BatchLoads ([#34105](https://github.com/apache/beam/pull/34105))
+* [Iceberg] Added new CDC source for batch and streaming, available as 
`Managed.ICEBERG_CDC` ([#33504](https://github.com/apache/beam/pull/33504))

Review Comment:
   nit: is batch a different PR to reference?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Utility to fetch and cache Iceberg {@link Table}s. */
+class TableCache {
+  private static final Cache<TableIdentifier, Table> CACHE =
+      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();
+
+  static Table get(TableIdentifier identifier, Catalog catalog) {
+    try {
+      return CACHE.get(identifier, () -> catalog.loadTable(identifier));
+    } catch (ExecutionException e) {
+      throw new RuntimeException(
+          "Encountered a problem fetching table " + identifier + " from 
cache.", e);
+    }
+  }
+
+  static Table get(String identifier, Catalog catalog) {
+    return get(TableIdentifier.parse(identifier), catalog);
+  }
+
+  static Table getRefreshed(TableIdentifier identifier, Catalog catalog) {
+    @Nullable Table table = CACHE.getIfPresent(identifier);
+    if (table == null) {
+      return get(identifier, catalog);
+    }
+    table.refresh();

Review Comment:
   is it safe/expensive to refresh from multiple threads?
   Could consider using a refreshing cache to ensure only one refresh is 
happening at a time:
   https://github.com/google/guava/wiki/cachesexplained#refresh



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to