kennknowles commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r1941516776


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotRange.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+abstract class SnapshotRange implements Serializable {
+  private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
+
+  static Builder builder() {
+    return new AutoValue_SnapshotRange.Builder();
+  }
+
+  abstract String getTableIdentifierString();
+
+  abstract @Nullable Long getFromSnapshotExclusive();

Review Comment:
   Worth a javadoc comment even though not public class - I assume null means 
"from the beginning"



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -413,29 +438,56 @@ abstract static class Builder {
 
       abstract Builder setTableIdentifier(TableIdentifier identifier);
 
+      abstract Builder setFromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive);
+
+      abstract Builder setToSnapshot(@Nullable Long toSnapshot);
+
+      abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
+
       abstract ReadRows build();
     }
 
     public ReadRows from(TableIdentifier tableIdentifier) {
       return toBuilder().setTableIdentifier(tableIdentifier).build();
     }
 
+    public ReadRows fromSnapshotExclusive(@Nullable Long 
fromSnapshotExclusive) {
+      return 
toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
+    }
+
+    public ReadRows toSnapshot(@Nullable Long toSnapshot) {
+      return toBuilder().setToSnapshot(toSnapshot).build();
+    }
+
+    public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
+      return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+    }
+
     @Override
     public PCollection<Row> expand(PBegin input) {
       TableIdentifier tableId =
           checkStateNotNull(getTableIdentifier(), "Must set a table to read 
from.");
 
       Table table = getCatalogConfig().catalog().loadTable(tableId);
 
-      return input.apply(
-          Read.from(
-              new ScanSource(
-                  IcebergScanConfig.builder()
-                      .setCatalogConfig(getCatalogConfig())
-                      .setScanType(IcebergScanConfig.ScanType.TABLE)
-                      .setTableIdentifier(tableId)
-                      
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
-                      .build())));
+      IcebergScanConfig scanConfig =
+          IcebergScanConfig.builder()
+              .setCatalogConfig(getCatalogConfig())
+              .setScanType(IcebergScanConfig.ScanType.TABLE)
+              .setTableIdentifier(tableId)
+              
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
+              .setFromSnapshotExclusive(getFromSnapshotExclusive())
+              .setToSnapshot(getToSnapshot())
+              .build();
+      if (getTriggeringFrequency() != null

Review Comment:
   I'm not too convinced this should be what controls the incremental scan 
source. I think it might be best if the user very explicitly says they want to 
read unbounded rows, versus reading the table as a bounded data set.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/** Describes the table and snapshot a {@link ReadTask} belongs to. */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+abstract class ReadTaskDescriptor {
+  static final Coder<ReadTaskDescriptor> CODER;
+
+  static {

Review Comment:
   same coment about static block



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromGroupedTasks.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Unbounded read implementation.
+ *
+ * <p>An SDF that takes a batch of {@link ReadTask}s. For each task, reads 
Iceberg {@link Record}s,
+ * and converts to Beam {@link Row}s.
+ *
+ * <p>The split granularity is set to the incoming batch size, i.e. the number 
of potential splits

Review Comment:
   Below, you simply read the tasks one by one in a loop, so it is rather the 
same as if each ReadTask were an element. So at the level of Beam's semantics, 
it doesn't unlock anything. And we do get splitting at that level automatically 
when reading from shuffle. I cannot recall - will there always be a shuffle 
upstream of this?
   
   I haven't written an SDF like this, so I can see how it may be necessary to 
express this way. But are the batches going to be large and meaningful or are 
they also just arbitrary small sets of read tasks?



##########
sdks/java/io/iceberg/bqms/build.gradle:
##########
@@ -21,7 +21,9 @@ plugins {
 
 applyJavaNature(
         automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
-        shadowClosure: {},
+        shadowClosure: {
+            relocate "com.google.auth", 
getJavaRelocatedPath("bqms.com.google.auth")

Review Comment:
   can you add a comment to the file about why we have to do this?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskParser;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+abstract class ReadTask {
+  static final Coder<ReadTask> CODER;
+
+  static {
+    try {
+      CODER = SchemaRegistry.createDefault().getSchemaCoder(ReadTask.class);

Review Comment:
   Failures in static blocks cause the class to fail to load and cause funky 
errors messages that often lead people down the wrong path. Instead, do 
something like this:
   
   ```
   
   private static @MonotonicNonnull Coder<ReadTask> CODER;
   
   public static Coder<ReadTask> getCoder() {
     if (CODER == null) {
       CODER = 
SchemaRegistry.create().createDefault().getSchemaCoder(ReadTask.class);
     }
     return CODER;
   }
   ```
   
   (incidentally I know this pattern is common in Python but actually it causes 
havoc there in terms of "ImportError" leading people down the wrong debugging 
path, too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to