kennknowles commented on code in PR #33504: URL: https://github.com/apache/beam/pull/33504#discussion_r1941516776
########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotRange.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +abstract class SnapshotRange implements Serializable { + private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier; + + static Builder builder() { + return new AutoValue_SnapshotRange.Builder(); + } + + abstract String getTableIdentifierString(); + + abstract @Nullable Long getFromSnapshotExclusive(); Review Comment: Worth a javadoc comment even though not public class - I assume null means "from the beginning" ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java: ########## @@ -413,29 +438,56 @@ abstract static class Builder { abstract Builder setTableIdentifier(TableIdentifier identifier); + abstract Builder setFromSnapshotExclusive(@Nullable Long fromSnapshotExclusive); + + abstract Builder setToSnapshot(@Nullable Long toSnapshot); + + abstract Builder setTriggeringFrequency(Duration triggeringFrequency); + abstract ReadRows build(); } public ReadRows from(TableIdentifier tableIdentifier) { return toBuilder().setTableIdentifier(tableIdentifier).build(); } + public ReadRows fromSnapshotExclusive(@Nullable Long fromSnapshotExclusive) { + return toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build(); + } + + public ReadRows toSnapshot(@Nullable Long toSnapshot) { + return toBuilder().setToSnapshot(toSnapshot).build(); + } + + public ReadRows withTriggeringFrequency(Duration triggeringFrequency) { + return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); + } + @Override public PCollection<Row> expand(PBegin input) { TableIdentifier tableId = checkStateNotNull(getTableIdentifier(), "Must set a table to read from."); Table table = getCatalogConfig().catalog().loadTable(tableId); - return input.apply( - Read.from( - new ScanSource( - IcebergScanConfig.builder() - .setCatalogConfig(getCatalogConfig()) - .setScanType(IcebergScanConfig.ScanType.TABLE) - .setTableIdentifier(tableId) - .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) - .build()))); + IcebergScanConfig scanConfig = + IcebergScanConfig.builder() + .setCatalogConfig(getCatalogConfig()) + .setScanType(IcebergScanConfig.ScanType.TABLE) + .setTableIdentifier(tableId) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) + .setFromSnapshotExclusive(getFromSnapshotExclusive()) + .setToSnapshot(getToSnapshot()) + .build(); + if (getTriggeringFrequency() != null Review Comment: I'm not too convinced this should be what controls the incremental scan source. I think it might be best if the user very explicitly says they want to read unbounded rows, versus reading the table as a bounded data set. ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** Describes the table and snapshot a {@link ReadTask} belongs to. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +abstract class ReadTaskDescriptor { + static final Coder<ReadTaskDescriptor> CODER; + + static { Review Comment: same coment about static block ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromGroupedTasks.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Unbounded read implementation. + * + * <p>An SDF that takes a batch of {@link ReadTask}s. For each task, reads Iceberg {@link Record}s, + * and converts to Beam {@link Row}s. + * + * <p>The split granularity is set to the incoming batch size, i.e. the number of potential splits Review Comment: Below, you simply read the tasks one by one in a loop, so it is rather the same as if each ReadTask were an element. So at the level of Beam's semantics, it doesn't unlock anything. And we do get splitting at that level automatically when reading from shuffle. I cannot recall - will there always be a shuffle upstream of this? I haven't written an SDF like this, so I can see how it may be necessary to express this way. But are the batches going to be large and meaningful or are they also just arbitrary small sets of read tasks? ########## sdks/java/io/iceberg/bqms/build.gradle: ########## @@ -21,7 +21,9 @@ plugins { applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms', - shadowClosure: {}, + shadowClosure: { + relocate "com.google.auth", getJavaRelocatedPath("bqms.com.google.auth") Review Comment: can you add a comment to the file about why we have to do this? ########## sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTaskParser; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +abstract class ReadTask { + static final Coder<ReadTask> CODER; + + static { + try { + CODER = SchemaRegistry.createDefault().getSchemaCoder(ReadTask.class); Review Comment: Failures in static blocks cause the class to fail to load and cause funky errors messages that often lead people down the wrong path. Instead, do something like this: ``` private static @MonotonicNonnull Coder<ReadTask> CODER; public static Coder<ReadTask> getCoder() { if (CODER == null) { CODER = SchemaRegistry.create().createDefault().getSchemaCoder(ReadTask.class); } return CODER; } ``` (incidentally I know this pattern is common in Python but actually it causes havoc there in terms of "ImportError" leading people down the wrong debugging path, too) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
