kennknowles commented on code in PR #32451:
URL: https://github.com/apache/beam/pull/32451#discussion_r1761265126


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -77,6 +90,21 @@ public WriteRows to(DynamicDestinations destinations) {
       return toBuilder().setDynamicDestinations(destinations).build();
     }
 
+    /**
+     * Sets the frequency at which data is committed and a new {@link 
org.apache.iceberg.Snapshot}
+     * is produced.
+     *
+     * <p>Every triggeringFrequency duration, all accumulated {@link

Review Comment:
   Might want to soften the language so users know it is approximate, or we'll 
get complaints about it not lining up perfectly.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java:
##########
@@ -57,8 +66,59 @@ public String description() {
         + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), 
\"manifestListLocation\" (str)}";
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Configuration {
+    public static Builder builder() {
+      return new 
AutoValue_IcebergWriteSchemaTransformProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("Identifier of the Iceberg table.")
+    public abstract String getTable();
+
+    @SchemaFieldDescription("Name of the catalog containing the table.")
+    @Nullable

Review Comment:
   Make the annotation modify the type, not the method, like so:
   
   `public abstract @Nullable String`
   
   (you will need to switch to the checkerframework annotation for this)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java:
##########
@@ -57,8 +66,59 @@ public String description() {
         + "{\"table\" (str), \"operation\" (str), \"summary\" (map[str, str]), 
\"manifestListLocation\" (str)}";
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Configuration {
+    public static Builder builder() {
+      return new 
AutoValue_IcebergWriteSchemaTransformProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("Identifier of the Iceberg table.")
+    public abstract String getTable();
+
+    @SchemaFieldDescription("Name of the catalog containing the table.")
+    @Nullable
+    public abstract String getCatalogName();
+
+    @SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
+    @Nullable
+    public abstract Map<String, String> getCatalogProperties();
+
+    @SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
+    @Nullable

Review Comment:
   Same throughout



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java:
##########
@@ -307,4 +314,38 @@ public void testWritePartitionedData() {
     assertThat(
         returnedRecords, 
containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray()));
   }
+
+  @Test
+  public void testStreamingWrite() {

Review Comment:
   I suggest a couple more tests where the user has set up their PCollection 
differently, like if it started out with accumulating mode, or if they set a 
weird trigger in the middle of their pipeline.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -108,11 +119,40 @@ public KV<ShardedKey<Row>, Row> apply(Row elem) {
                 "Write remaining rows to files",
                 new WriteGroupedRowsToFiles(catalogConfig, 
dynamicDestinations));
 
+    PCollection<FileWriteResult> writeUngroupedResultPColl = 
writeUngroupedResult.getWrittenFiles();
+
+    if (triggeringFrequency != null) {
+      // for streaming pipelines, re-window both outputs to keep Flatten happy
+      writeGroupedResult =
+          writeGroupedResult.apply(
+              "RewindowGroupedRecords",
+              Window.<FileWriteResult>into(new GlobalWindows())

Review Comment:
   I think you also need to set discarding mode in both of these re-windowings, 
too. Or validate that they are already set up that way.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java:
##########
@@ -17,13 +17,21 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;
+
 import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;

Review Comment:
   Use checkerframework `Nullable` annotation (unless underlying library like 
Avro requires you to use theirs). I know it seems like an annotation wouldn't 
matter, but they have different capabilities in some cases so we standardize on 
the one that works everywhere it needs to.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -108,11 +119,40 @@ public KV<ShardedKey<Row>, Row> apply(Row elem) {
                 "Write remaining rows to files",
                 new WriteGroupedRowsToFiles(catalogConfig, 
dynamicDestinations));
 
+    PCollection<FileWriteResult> writeUngroupedResultPColl = 
writeUngroupedResult.getWrittenFiles();

Review Comment:
   Validate here also: if the input is unbounded then triggering frequency is 
set. I know you check it above, but also do it here so the local code is 
validated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to