scwhittle commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1983674467


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.iceberg.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * An Iceberg source that reads a table incrementally using range(s) of table 
snapshots. The bounded
+ * source creates a single range, while the unbounded implementation 
continuously polls for new
+ * snapshots at the specified interval.
+ *
+ * <p>Outputs CDC Rows with the following schema:
+ *
+ * <ul>
+ *   <li>"record" [Row]: the data record itself
+ *   <li>"operation" [String]: the operation associated with this record
+ * </ul>
+ */
+class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
+  // (streaming) We will group files into batches of this size
+  // Downstream, we create one reader per batch
+  // TODO(ahmedabu98): should we make this configurable?
+  private static final long MAX_FILES_BATCH_BYTE_SIZE = 1L << 32; // 4 GB
+  private static final Duration DEFAULT_POLL_INTERVAL = 
Duration.standardSeconds(60);
+  private final IcebergScanConfig scanConfig;
+
+  IncrementalScanSource(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @Override
+  public PCollection<Row> expand(PBegin input) {
+    Table table =
+        TableCache.getRefreshed(
+            scanConfig.getTableIdentifier(), 
scanConfig.getCatalogConfig().catalog());
+
+    PCollection<Row> rows =
+        MoreObjects.firstNonNull(scanConfig.getStreaming(), false)
+            ? readUnbounded(input)
+            : readBounded(input, table);
+    return rows.setRowSchema(ReadUtils.outputCdcSchema(table.schema()));
+  }
+
+  /**
+   * Watches for new snapshots and creates tasks for each range. Uses GiB 
(with auto-sharding) to
+   * groups tasks in batches of size {@link #MAX_FILES_BATCH_BYTE_SIZE}, then 
reads from each batch
+   * using an SDF.
+   *
+   * <p>Output schema is:
+   *
+   * <ul>
+   *   <li>"record": the actual data record
+   *   <li>"operation": the snapshot operation associated with this record 
(e.g. "append",
+   *       "replace", "delete")
+   * </ul>
+   */
+  private PCollection<Row> readUnbounded(PBegin input) {
+    @Nullable
+    Duration pollInterval =
+        MoreObjects.firstNonNull(scanConfig.getPollInterval(), 
DEFAULT_POLL_INTERVAL);
+    return input
+        .apply("Watch for Snapshots", new WatchForSnapshots(scanConfig, 
pollInterval))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), 
ListCoder.of(SnapshotInfo.getCoder())))
+        .apply("Create Read Tasks", ParDo.of(new 
CreateReadTasksDoFn(scanConfig)))
+        .setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), 
ReadTask.getCoder()))
+        .apply(
+            Window.<KV<ReadTaskDescriptor, ReadTask>>into(new GlobalWindows())
+                
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

Review Comment:
   can this trigger be removed? seems like the GiB does the triggering so I'm 
not sure if this has effect (or if it does if it is intended).



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Bounded read implementation.
+ *
+ * <p>For each {@link ReadTask}, reads Iceberg {@link Record}s, and converts 
to Beam {@link Row}s.
+ */
+class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
+  private final IcebergScanConfig scanConfig;
+  private final Counter scanTasksCompleted =
+      Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");
+
+  ReadFromTasks(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(@Element KV<ReadTaskDescriptor, ReadTask> element, 
OutputReceiver<Row> out)
+      throws IOException, ExecutionException {
+    String tableIdentifier = element.getKey().getTableIdentifierString();
+    ReadTask readTask = element.getValue();
+    Table table = TableCache.get(tableIdentifier, 
scanConfig.getCatalogConfig().catalog());
+    Schema dataSchema = IcebergUtils.icebergSchemaToBeamSchema(table.schema());
+    Schema outputCdcSchema = ReadUtils.outputCdcSchema(dataSchema);
+
+    Instant outputTimestamp = ReadUtils.getReadTaskTimestamp(readTask, 
scanConfig);
+    FileScanTask task = readTask.getFileScanTask();
+    @Nullable String operation = readTask.getOperation();
+
+    try (CloseableIterable<Record> reader = ReadUtils.createReader(task, 
table)) {
+      for (Record record : reader) {
+        Row row =
+            Row.withSchema(outputCdcSchema)
+                .addValue(IcebergUtils.icebergRecordToBeamRow(dataSchema, 
record))
+                .addValue(operation)
+                .build();
+        out.outputWithTimestamp(row, outputTimestamp);
+      }
+    }
+    scanTasksCompleted.inc();
+  }
+
+  // infinite skew in case we encounter some files that don't support 
watermark column statistics,

Review Comment:
   it seems like this will either:
   - hold the watermark
   - output late stuff that will be dropped



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Bounded read implementation.
+ *
+ * <p>For each {@link ReadTask}, reads Iceberg {@link Record}s, and converts 
to Beam {@link Row}s.
+ */
+class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
+  private final IcebergScanConfig scanConfig;
+  private final Counter scanTasksCompleted =
+      Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");
+
+  ReadFromTasks(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(@Element KV<ReadTaskDescriptor, ReadTask> element, 
OutputReceiver<Row> out)
+      throws IOException, ExecutionException {
+    String tableIdentifier = element.getKey().getTableIdentifierString();
+    ReadTask readTask = element.getValue();
+    Table table = TableCache.get(tableIdentifier, 
scanConfig.getCatalogConfig().catalog());
+    Schema dataSchema = IcebergUtils.icebergSchemaToBeamSchema(table.schema());
+    Schema outputCdcSchema = ReadUtils.outputCdcSchema(dataSchema);
+
+    Instant outputTimestamp = ReadUtils.getReadTaskTimestamp(readTask, 
scanConfig);

Review Comment:
   it seems like this should be done when creating the read task, that way it 
will hold the watermark up appropriately while the task is being shuffled etc.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.iceberg.Table;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * An Iceberg source that reads a table incrementally using range(s) of table 
snapshots. The bounded
+ * source creates a single range, while the unbounded implementation 
continuously polls for new
+ * snapshots at the specified interval.
+ *
+ * <p>Outputs CDC Rows with the following schema:
+ *
+ * <ul>
+ *   <li>"record" [Row]: the data record itself
+ *   <li>"operation" [String]: the operation associated with this record
+ * </ul>
+ */
+class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
+  // (streaming) We will group files into batches of this size
+  // Downstream, we create one reader per batch
+  // TODO(ahmedabu98): should we make this configurable?
+  private static final long MAX_FILES_BATCH_BYTE_SIZE = 1L << 32; // 4 GB
+  private static final Duration DEFAULT_POLL_INTERVAL = 
Duration.standardSeconds(60);
+  private final IcebergScanConfig scanConfig;
+
+  IncrementalScanSource(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @Override
+  public PCollection<Row> expand(PBegin input) {
+    Table table =
+        TableCache.getRefreshed(
+            scanConfig.getTableIdentifier(), 
scanConfig.getCatalogConfig().catalog());
+
+    PCollection<Row> rows =
+        MoreObjects.firstNonNull(scanConfig.getStreaming(), false)
+            ? readUnbounded(input)
+            : readBounded(input, table);
+    return rows.setRowSchema(ReadUtils.outputCdcSchema(table.schema()));
+  }
+
+  /**
+   * Watches for new snapshots and creates tasks for each range. Uses GiB 
(with auto-sharding) to
+   * groups tasks in batches of size {@link #MAX_FILES_BATCH_BYTE_SIZE}, then 
reads from each batch
+   * using an SDF.
+   *
+   * <p>Output schema is:
+   *
+   * <ul>
+   *   <li>"record": the actual data record
+   *   <li>"operation": the snapshot operation associated with this record 
(e.g. "append",
+   *       "replace", "delete")
+   * </ul>
+   */
+  private PCollection<Row> readUnbounded(PBegin input) {
+    @Nullable
+    Duration pollInterval =
+        MoreObjects.firstNonNull(scanConfig.getPollInterval(), 
DEFAULT_POLL_INTERVAL);
+    return input
+        .apply("Watch for Snapshots", new WatchForSnapshots(scanConfig, 
pollInterval))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), 
ListCoder.of(SnapshotInfo.getCoder())))
+        .apply("Create Read Tasks", ParDo.of(new 
CreateReadTasksDoFn(scanConfig)))
+        .setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), 
ReadTask.getCoder()))
+        .apply(
+            Window.<KV<ReadTaskDescriptor, ReadTask>>into(new GlobalWindows())
+                
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                .discardingFiredPanes())
+        .apply(
+            GroupIntoBatches.<ReadTaskDescriptor, ReadTask>ofByteSize(
+                    MAX_FILES_BATCH_BYTE_SIZE, ReadTask::getByteSize)

Review Comment:
   we don't really want these batches, we just want the read tasks distributed 
to workers without causing worker ooms.  Otherwise we're just adding latency 
for the poll latency and not really benefitting from the batch.
   
   Ideally we could change Redistribute to autoshard, but since it is tied to 
GroupIntoBatches currently, what about just doing 
GroupIntoBatches.ofSize(1).withShardedKey() ?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Bounded read implementation.
+ *
+ * <p>For each {@link ReadTask}, reads Iceberg {@link Record}s, and converts 
to Beam {@link Row}s.
+ */
+class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
+  private final IcebergScanConfig scanConfig;
+  private final Counter scanTasksCompleted =
+      Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");
+
+  ReadFromTasks(IcebergScanConfig scanConfig) {
+    this.scanConfig = scanConfig;
+  }
+
+  @ProcessElement
+  public void process(@Element KV<ReadTaskDescriptor, ReadTask> element, 
OutputReceiver<Row> out)
+      throws IOException, ExecutionException {
+    String tableIdentifier = element.getKey().getTableIdentifierString();
+    ReadTask readTask = element.getValue();
+    Table table = TableCache.get(tableIdentifier, 
scanConfig.getCatalogConfig().catalog());
+    Schema dataSchema = IcebergUtils.icebergSchemaToBeamSchema(table.schema());

Review Comment:
   seems like some additional things that could be cached. 
   
   Or better yet can the schemas be built at pipeline construction time? Having 
well-defined schemas seems like it will help for pipeline update compatability. 
 Each one now is going to get a unique uuid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to