tomstepp commented on code in PR #36720:
URL: https://github.com/apache/beam/pull/36720#discussion_r2548175170


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -47,30 +52,38 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
   private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000;
   // Used for auto-sharding in streaming. Limits total byte size per batch/file
   public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB
-  static final int DEFAULT_NUM_FILE_SHARDS = 0;
+  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
   private final IcebergCatalogConfig catalogConfig;
   private final DynamicDestinations dynamicDestinations;
   private final @Nullable Duration triggeringFrequency;
   private final String filePrefix;
+  private final @Nullable Integer directWriteByteLimit;
 
   WriteToDestinations(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      @Nullable Duration triggeringFrequency) {
+      @Nullable Duration triggeringFrequency,
+      @Nullable Integer directWriteByteLimit) {
     this.dynamicDestinations = dynamicDestinations;
     this.catalogConfig = catalogConfig;
     this.triggeringFrequency = triggeringFrequency;
+    this.directWriteByteLimit = directWriteByteLimit;
     // single unique prefix per write transform
     this.filePrefix = UUID.randomUUID().toString();
   }
 
   @Override
   public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
     // Write records to files
-    PCollection<FileWriteResult> writtenFiles =
-        input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
-            ? writeTriggered(input)
-            : writeUntriggered(input);
+    PCollection<FileWriteResult> writtenFiles;
+    if (directWriteByteLimit != null && directWriteByteLimit >= 0) {
+      writtenFiles = writeTriggeredWithBundleLifting(input);

Review Comment:
   Added check at higher level, and made sure bundle lifting is only done for 
unbounded



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -79,6 +92,72 @@ public IcebergWriteResult expand(PCollection<KV<String, 
Row>> input) {
     return new IcebergWriteResult(input.getPipeline(), snapshots);
   }
 
+  private PCollection<FileWriteResult> writeTriggeredWithBundleLifting(
+      PCollection<KV<String, Row>> input) {
+    checkArgumentNotNull(
+        triggeringFrequency, "Streaming pipelines must set a triggering 
frequency.");
+    checkArgumentNotNull(
+        directWriteByteLimit, "Must set non-null directWriteByteLimit for 
bundle lifting.");
+
+    final TupleTag<KV<String, Row>> groupedRecordsTag = new 
TupleTag<>("small_batches");
+    final TupleTag<KV<String, Row>> directRecordsTag = new 
TupleTag<>("large_batches");
+
+    input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+    PCollectionTuple bundleOutputs =
+        input.apply(
+            BundleLifter.of(
+                groupedRecordsTag, directRecordsTag, directWriteByteLimit, new 
RowSizer()));
+
+    PCollection<KV<String, Row>> smallBatches =
+        bundleOutputs
+            .get(groupedRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+    PCollection<KV<String, Row>> largeBatches =
+        bundleOutputs
+            .get(directRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+
+    PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
+        smallBatches
+            .apply(
+                GroupIntoBatches.<String, 
Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                    .withByteSize(FILE_TRIGGERING_BYTE_COUNT)
+                    
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
+                    .withShardedKey())
+            .setCoder(
+                KvCoder.of(
+                    Coder.of(StringUtf8Coder.of()),
+                    
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
+
+    PCollection<FileWriteResult> directFileWrites =
+        largeBatches.apply(
+            "WriteDirectRowsToFiles",
+            new WriteDirectRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
+
+    PCollection<FileWriteResult> groupedFileWrites =
+        groupedRecords.apply(
+            "WriteGroupedRows",
+            new WriteGroupedRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to