ahmedabu98 commented on code in PR #36720:
URL: https://github.com/apache/beam/pull/36720#discussion_r2546580824


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * <p>This is the Java equivalent of the BundleLifter PTransform in Python.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A private, static DoFn that buffers elements within a bundle and outputs 
them to different tags
+   * in finish_bundle based on the total bundle size.
+   *
+   * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now 
merged inside the
+   * PTransform.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  private static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSize;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSize = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      buffer.add(element);
+      bundleSize += elementSizer.apply(element);
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      if (buffer.isEmpty()) {
+        return;
+      }
+
+      TupleTag<T> targetTag;
+
+      // Select the target tag based on the bundle size
+      if (bundleSize < threshold) {
+        targetTag = smallBatchTag;
+        LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, 
targetTag.getId());
+      } else {
+        targetTag = largeBatchTag;
+        LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, 
targetTag.getId());
+      }
+
+      checkArgumentNotNull(receiver, "Receiver should be set by startBundle.");
+      OutputReceiver<T> taggedOutput = receiver.get(targetTag);
+
+      for (T element : buffer) {
+        taggedOutput.output(element);
+      }
+    }
+  }
+
+  public BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, 
int threshold) {
+    this(smallBatchTag, largeBatchTag, threshold, x -> 1);
+  }
+
+  public BundleLifter(

Review Comment:
   nit: Make these construction methods private if the expected usage will be 
`BundleLifter.of(...)`



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * <p>This is the Java equivalent of the BundleLifter PTransform in Python.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A private, static DoFn that buffers elements within a bundle and outputs 
them to different tags
+   * in finish_bundle based on the total bundle size.
+   *
+   * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now 
merged inside the
+   * PTransform.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  private static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSize;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSize = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      buffer.add(element);
+      bundleSize += elementSizer.apply(element);
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      if (buffer.isEmpty()) {
+        return;
+      }
+
+      TupleTag<T> targetTag;
+
+      // Select the target tag based on the bundle size
+      if (bundleSize < threshold) {
+        targetTag = smallBatchTag;
+        LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, 
targetTag.getId());

Review Comment:
   nit: It seems like `bundleSize` could be up to interpretation, depending on 
what the `elementSizer` function does? Maybe the log message could be "Emitting 
batch of elements of size {} to small/large tag", to avoid assuming that 
`bundleSize` will always mean count of elements



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * <p>This is the Java equivalent of the BundleLifter PTransform in Python.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A private, static DoFn that buffers elements within a bundle and outputs 
them to different tags
+   * in finish_bundle based on the total bundle size.
+   *
+   * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now 
merged inside the
+   * PTransform.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  private static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSize;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSize = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }

Review Comment:
   Btw finish bundle has access to a `FinishBundleContext` that can output 
elements to different TupleTags. Consider using that for simpler code design. 
They may also be implications for using a ProcessElement's output receiver 
outside of that scope. But lmk if there is a reason you're doing it this way
   
   ```
   @FinishBundle
       public void finishBundle(FinishBundleContext c) {
         c.output(targetTag, element, timestamp, window);
   ```



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java:
##########
@@ -455,7 +463,11 @@ public IcebergWriteResult expand(PCollection<Row> input) {
           .apply("Assign Table Destinations", new 
AssignDestinations(destinations))
           .apply(
               "Write Rows to Destinations",
-              new WriteToDestinations(getCatalogConfig(), destinations, 
getTriggeringFrequency()));
+              new WriteToDestinations(
+                  getCatalogConfig(),
+                  destinations,
+                  getTriggeringFrequency(),
+                  getDirectWriteByteLimit()));

Review Comment:
   Let's validate that the input is Unbounded if `directWriteByteLimit` is set. 
Should throw an error if that's not the case



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -135,10 +216,33 @@ private PCollection<FileWriteResult> 
writeUntriggered(PCollection<KV<String, Row
             .apply("Group spilled rows by destination shard", 
GroupByKey.create())
             .apply(
                 "Write remaining rows to files",
-                new WriteGroupedRowsToFiles(catalogConfig, 
dynamicDestinations, filePrefix));
+                new WriteGroupedRowsToFiles(
+                    catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
 
     return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
         .and(writeGroupedResult)
         .apply("Flatten Written Files", Flatten.pCollections());
   }
+
+  /**
+   * A SerializableFunction to estimate the byte size of a Row for bundling 
purposes. This is a
+   * heuristic that avoids the high cost of encoding each row with a Coder.
+   */
+  private static class RowSizer implements SerializableFunction<KV<String, 
Row>, Integer> {
+    @Override
+    public Integer apply(KV<String, Row> element) {
+      Row row = element.getValue();
+      int size = 0;
+      for (Object value : row.getValues()) {
+        if (value instanceof String) {
+          size += Utf8.encodedLength((String) value);
+        } else if (value instanceof byte[]) {
+          size += ((byte[]) value).length;
+        } else {
+          size += 8; // Approximation for non-string/byte fields
+        }
+      }

Review Comment:
   Also, I think we should skip null values. Currently a null `value` will 
return false for `value instanceof ___`, so it falls to the else block's `size 
+= 8` 



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -47,30 +52,38 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
   private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000;
   // Used for auto-sharding in streaming. Limits total byte size per batch/file
   public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB
-  static final int DEFAULT_NUM_FILE_SHARDS = 0;
+  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
   private final IcebergCatalogConfig catalogConfig;
   private final DynamicDestinations dynamicDestinations;
   private final @Nullable Duration triggeringFrequency;
   private final String filePrefix;
+  private final @Nullable Integer directWriteByteLimit;
 
   WriteToDestinations(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      @Nullable Duration triggeringFrequency) {
+      @Nullable Duration triggeringFrequency,
+      @Nullable Integer directWriteByteLimit) {
     this.dynamicDestinations = dynamicDestinations;
     this.catalogConfig = catalogConfig;
     this.triggeringFrequency = triggeringFrequency;
+    this.directWriteByteLimit = directWriteByteLimit;
     // single unique prefix per write transform
     this.filePrefix = UUID.randomUUID().toString();
   }
 
   @Override
   public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
     // Write records to files
-    PCollection<FileWriteResult> writtenFiles =
-        input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
-            ? writeTriggered(input)
-            : writeUntriggered(input);
+    PCollection<FileWriteResult> writtenFiles;
+    if (directWriteByteLimit != null && directWriteByteLimit >= 0) {
+      writtenFiles = writeTriggeredWithBundleLifting(input);

Review Comment:
   I'm guessing this should also check for UNBOUNDED input. We can make that 
check here in the higher-level IcebergIO class (see previous comment)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * <p>This is the Java equivalent of the BundleLifter PTransform in Python.

Review Comment:
   Does Python actually have a transform called "BundleLifter" ? Can't seem to 
find it



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -135,10 +216,33 @@ private PCollection<FileWriteResult> 
writeUntriggered(PCollection<KV<String, Row
             .apply("Group spilled rows by destination shard", 
GroupByKey.create())
             .apply(
                 "Write remaining rows to files",
-                new WriteGroupedRowsToFiles(catalogConfig, 
dynamicDestinations, filePrefix));
+                new WriteGroupedRowsToFiles(
+                    catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
 
     return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
         .and(writeGroupedResult)
         .apply("Flatten Written Files", Flatten.pCollections());
   }
+
+  /**
+   * A SerializableFunction to estimate the byte size of a Row for bundling 
purposes. This is a
+   * heuristic that avoids the high cost of encoding each row with a Coder.
+   */
+  private static class RowSizer implements SerializableFunction<KV<String, 
Row>, Integer> {
+    @Override
+    public Integer apply(KV<String, Row> element) {
+      Row row = element.getValue();
+      int size = 0;
+      for (Object value : row.getValues()) {
+        if (value instanceof String) {
+          size += Utf8.encodedLength((String) value);
+        } else if (value instanceof byte[]) {
+          size += ((byte[]) value).length;
+        } else {
+          size += 8; // Approximation for non-string/byte fields
+        }
+      }

Review Comment:
   Let's handle collections as well. This heuristic should be applied for:
   - each element of a list
   - key-value pairs of a map
   - values in a nested Row
   
   We wouldn't want to assume a 1,000 item list is 8 bytes 



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java:
##########
@@ -79,6 +92,72 @@ public IcebergWriteResult expand(PCollection<KV<String, 
Row>> input) {
     return new IcebergWriteResult(input.getPipeline(), snapshots);
   }
 
+  private PCollection<FileWriteResult> writeTriggeredWithBundleLifting(
+      PCollection<KV<String, Row>> input) {
+    checkArgumentNotNull(
+        triggeringFrequency, "Streaming pipelines must set a triggering 
frequency.");
+    checkArgumentNotNull(
+        directWriteByteLimit, "Must set non-null directWriteByteLimit for 
bundle lifting.");
+
+    final TupleTag<KV<String, Row>> groupedRecordsTag = new 
TupleTag<>("small_batches");
+    final TupleTag<KV<String, Row>> directRecordsTag = new 
TupleTag<>("large_batches");
+
+    input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+    PCollectionTuple bundleOutputs =
+        input.apply(
+            BundleLifter.of(
+                groupedRecordsTag, directRecordsTag, directWriteByteLimit, new 
RowSizer()));
+
+    PCollection<KV<String, Row>> smallBatches =
+        bundleOutputs
+            .get(groupedRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+    PCollection<KV<String, Row>> largeBatches =
+        bundleOutputs
+            .get(directRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+
+    PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
+        smallBatches
+            .apply(
+                GroupIntoBatches.<String, 
Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                    .withByteSize(FILE_TRIGGERING_BYTE_COUNT)
+                    
.withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency))
+                    .withShardedKey())
+            .setCoder(
+                KvCoder.of(
+                    Coder.of(StringUtf8Coder.of()),
+                    
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
+
+    PCollection<FileWriteResult> directFileWrites =
+        largeBatches.apply(
+            "WriteDirectRowsToFiles",
+            new WriteDirectRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
+
+    PCollection<FileWriteResult> groupedFileWrites =
+        groupedRecords.apply(
+            "WriteGroupedRows",
+            new WriteGroupedRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));

Review Comment:
   nit: There's a little overlap between this section and the writeTriggered 
path: 
   
https://github.com/apache/beam/blob/e336419205c3d6580c45ed33b2f08d5194178499/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java#L93-L106
   
   Consider extracting into a "groupAndWriteRecords" method to handle both



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * <p>This is the Java equivalent of the BundleLifter PTransform in Python.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A private, static DoFn that buffers elements within a bundle and outputs 
them to different tags
+   * in finish_bundle based on the total bundle size.
+   *
+   * <p>This is the Java equivalent of the _BundleLiftDoFn in Python, now 
merged inside the
+   * PTransform.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  private static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSize;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSize = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      buffer.add(element);
+      bundleSize += elementSizer.apply(element);
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      if (buffer.isEmpty()) {
+        return;
+      }
+
+      TupleTag<T> targetTag;
+
+      // Select the target tag based on the bundle size
+      if (bundleSize < threshold) {
+        targetTag = smallBatchTag;
+        LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, 
targetTag.getId());
+      } else {
+        targetTag = largeBatchTag;
+        LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, 
targetTag.getId());
+      }
+
+      checkArgumentNotNull(receiver, "Receiver should be set by startBundle.");
+      OutputReceiver<T> taggedOutput = receiver.get(targetTag);
+
+      for (T element : buffer) {
+        taggedOutput.output(element);
+      }

Review Comment:
   Are bundles guaranteed to stay the same between transforms? 
   
   e.g. say we detected a large bundle and are flushing the rows down to 
WriteDirectRowsToFiles. Will the bundle of rows stay the same when it reaches 
that transform?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to