This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 45911229938 Iceberg direct write (#36720)
45911229938 is described below

commit 45911229938b34aa2918620d075dd16309b2e245
Author: Tom Stepp <[email protected]>
AuthorDate: Tue Nov 25 10:03:15 2025 -0800

    Iceberg direct write (#36720)
    
    * Iceberg direct write
    
    * Make RowSizer compatible with Java 11.
    
    * Fix some build issues
    
    * Minor updates
    
    * More efficient encoded string size calculation
    
    * Rm extra parenthesis
    
    * Write direct rows to files
    
    * Address PR feedback
    
    * Remove commented import
    
    * Mack new Iceberg util methods package private.
    
    * Add unit test
---
 .../apache/beam/sdk/io/iceberg/BundleLifter.java   | 170 ++++++++++++++++++++
 .../org/apache/beam/sdk/io/iceberg/IcebergIO.java  |  19 ++-
 .../apache/beam/sdk/io/iceberg/IcebergUtils.java   |   9 ++
 .../IcebergWriteSchemaTransformProvider.java       |  11 ++
 ...owsToFiles.java => WriteDirectRowsToFiles.java} |  86 ++++++----
 .../sdk/io/iceberg/WriteGroupedRowsToFiles.java    |   9 +-
 .../beam/sdk/io/iceberg/WriteToDestinations.java   | 173 +++++++++++++++++----
 .../sdk/io/iceberg/WriteUngroupedRowsToFiles.java  |   9 +-
 .../beam/sdk/io/iceberg/BundleLifterTest.java      |  99 ++++++++++++
 9 files changed, 512 insertions(+), 73 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
new file mode 100644
index 00000000000..639e247357f
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PTransform that buffers elements and outputs them to one of two TupleTags 
based on the total
+ * size of the bundle in finish_bundle.
+ *
+ * @param <T> The type of elements in the input PCollection.
+ */
+public class BundleLifter<T> extends PTransform<PCollection<T>, 
PCollectionTuple> {
+
+  final TupleTag<T> smallBatchTag;
+  final TupleTag<T> largeBatchTag;
+  final int threshold;
+  final SerializableFunction<T, Integer> elementSizer;
+
+  /**
+   * A DoFn that buffers elements within a bundle and outputs them to 
different tags in
+   * finish_bundle based on the total bundle size.
+   *
+   * @param <T> The type of elements being processed.
+   */
+  static class BundleLiftDoFn<T> extends DoFn<T, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BundleLiftDoFn.class);
+
+    final TupleTag<T> smallBatchTag;
+    final TupleTag<T> largeBatchTag;
+    final int threshold;
+    final SerializableFunction<T, Integer> elementSizer;
+
+    private transient @MonotonicNonNull List<T> buffer;
+    private transient long bundleSizeBytes;
+    private transient @Nullable MultiOutputReceiver receiver;
+
+    BundleLiftDoFn(
+        TupleTag<T> smallBatchTag,
+        TupleTag<T> largeBatchTag,
+        int threshold,
+        SerializableFunction<T, Integer> elementSizer) {
+      this.smallBatchTag = smallBatchTag;
+      this.largeBatchTag = largeBatchTag;
+      this.threshold = threshold;
+      this.elementSizer = elementSizer;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      buffer = new ArrayList<>();
+      receiver = null;
+      bundleSizeBytes = 0L;
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, MultiOutputReceiver mor) {
+      if (receiver == null) {
+        receiver = mor;
+      }
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      buffer.add(element);
+      bundleSizeBytes += elementSizer.apply(element);
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
+      if (buffer.isEmpty()) {
+        return;
+      }
+
+      // Select the target tag based on the bundle size
+      TupleTag<T> targetTag;
+      targetTag = (bundleSizeBytes < threshold) ? smallBatchTag : 
largeBatchTag;
+      LOG.debug(
+          "Emitting {} elements of {} estimated bytes to tag: '{}'",
+          buffer.size(),
+          bundleSizeBytes,
+          targetTag.getId());
+
+      checkArgumentNotNull(receiver, "Receiver should be set by startBundle.");
+      OutputReceiver<T> taggedOutput = receiver.get(targetTag);
+
+      for (T element : buffer) {
+        taggedOutput.output(element);
+      }
+    }
+  }
+
+  private BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, 
int threshold) {
+    this(smallBatchTag, largeBatchTag, threshold, x -> 1);
+  }
+
+  private BundleLifter(
+      TupleTag<T> smallBatchTag,
+      TupleTag<T> largeBatchTag,
+      int threshold,
+      SerializableFunction<T, Integer> elementSizer) {
+    if (smallBatchTag == null || largeBatchTag == null) {
+      throw new IllegalArgumentException("smallBatchTag and largeBatchTag must 
not be null");
+    }
+    if (smallBatchTag.getId().equals(largeBatchTag.getId())) {
+      throw new IllegalArgumentException("smallBatchTag and largeBatchTag must 
be different");
+    }
+    if (threshold <= 0) {
+      throw new IllegalArgumentException("Threshold must be a positive 
integer");
+    }
+
+    this.smallBatchTag = smallBatchTag;
+    this.largeBatchTag = largeBatchTag;
+    this.threshold = threshold;
+    this.elementSizer = elementSizer;
+  }
+
+  public static <T> BundleLifter<T> of(
+      TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) {
+    return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold);
+  }
+
+  public static <T> BundleLifter<T> of(
+      TupleTag<T> smallBatchTag,
+      TupleTag<T> largeBatchTag,
+      int threshold,
+      SerializableFunction<T, Integer> elementSizer) {
+    return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, 
elementSizer);
+  }
+
+  @Override
+  public PCollectionTuple expand(PCollection<T> input) {
+    final TupleTag<Void> mainOutputTag = new TupleTag<Void>() {};
+
+    return input.apply(
+        "BundleLiftDoFn",
+        ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, 
elementSizer))
+            .withOutputTags(mainOutputTag, 
TupleTagList.of(smallBatchTag).and(largeBatchTag)));
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 956e45651df..1d71ad54909 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -395,6 +395,8 @@ public class IcebergIO {
 
     abstract @Nullable Duration getTriggeringFrequency();
 
+    abstract @Nullable Integer getDirectWriteByteLimit();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -407,6 +409,8 @@ public class IcebergIO {
 
       abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
 
+      abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
+
       abstract WriteRows build();
     }
 
@@ -435,6 +439,10 @@ public class IcebergIO {
       return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
     }
 
+    public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
+      return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
+    }
+
     @Override
     public IcebergWriteResult expand(PCollection<Row> input) {
       List<?> allToArgs = Arrays.asList(getTableIdentifier(), 
getDynamicDestinations());
@@ -451,11 +459,20 @@ public class IcebergIO {
 
       // Assign destinations before re-windowing to global in 
WriteToDestinations because
       // user's dynamic destination may depend on windowing properties
+      if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) {
+        Preconditions.checkArgument(
+            IcebergUtils.isUnbounded(input),
+            "Must only provide direct write limit for unbounded pipelines.");
+      }
       return input
           .apply("Assign Table Destinations", new 
AssignDestinations(destinations))
           .apply(
               "Write Rows to Destinations",
-              new WriteToDestinations(getCatalogConfig(), destinations, 
getTriggeringFrequency()));
+              new WriteToDestinations(
+                  getCatalogConfig(),
+                  destinations,
+                  getTriggeringFrequency(),
+                  getDirectWriteByteLimit()));
     }
   }
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index 4b448a2e08c..f76d000628f 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -38,6 +38,7 @@ import 
org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
 import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -608,4 +609,12 @@ public class IcebergUtils {
     // LocalDateTime, LocalDate, LocalTime
     return icebergValue;
   }
+
+  static <T> boolean isUnbounded(PCollection<T> input) {
+    return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED);
+  }
+
+  static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) 
{
+    return directWriteByteLimit != null && directWriteByteLimit >= 0;
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 71c898b0044..428ef71f23e 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -95,6 +95,10 @@ public class IcebergWriteSchemaTransformProvider
         "For a streaming pipeline, sets the frequency at which snapshots are 
produced.")
     public abstract @Nullable Integer getTriggeringFrequencySeconds();
 
+    @SchemaFieldDescription(
+        "For a streaming pipeline, sets the limit for lifting bundles into the 
direct write path.")
+    public abstract @Nullable Integer getDirectWriteByteLimit();
+
     @SchemaFieldDescription(
         "A list of field names to keep in the input record. All other fields 
are dropped before writing. "
             + "Is mutually exclusive with 'drop' and 'only'.")
@@ -142,6 +146,8 @@ public class IcebergWriteSchemaTransformProvider
 
       public abstract Builder setTriggeringFrequencySeconds(Integer 
triggeringFrequencySeconds);
 
+      public abstract Builder setDirectWriteByteLimit(Integer 
directWriteByteLimit);
+
       public abstract Builder setKeep(List<String> keep);
 
       public abstract Builder setDrop(List<String> drop);
@@ -227,6 +233,11 @@ public class IcebergWriteSchemaTransformProvider
         writeTransform = 
writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq));
       }
 
+      Integer directWriteByteLimit = configuration.getDirectWriteByteLimit();
+      if (directWriteByteLimit != null) {
+        writeTransform = 
writeTransform.withDirectWriteByteLimit(directWriteByteLimit);
+      }
+
       // TODO: support dynamic destinations
       IcebergWriteResult result = rows.apply(writeTransform);
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
similarity index 58%
copy from 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
copy to 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
index 7db1ac42659..8835e2ff628 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java
@@ -18,59 +18,60 @@
 package org.apache.beam.sdk.io.iceberg;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.ShardedKey;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.iceberg.catalog.Catalog;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
-class WriteGroupedRowsToFiles
-    extends PTransform<
-        PCollection<KV<ShardedKey<String>, Iterable<Row>>>, 
PCollection<FileWriteResult>> {
-
-  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
+class WriteDirectRowsToFiles
+    extends PTransform<PCollection<KV<String, Row>>, 
PCollection<FileWriteResult>> {
 
   private final DynamicDestinations dynamicDestinations;
   private final IcebergCatalogConfig catalogConfig;
   private final String filePrefix;
+  private final long maxBytesPerFile;
 
-  WriteGroupedRowsToFiles(
+  WriteDirectRowsToFiles(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      String filePrefix) {
+      String filePrefix,
+      long maxBytesPerFile) {
     this.catalogConfig = catalogConfig;
     this.dynamicDestinations = dynamicDestinations;
     this.filePrefix = filePrefix;
+    this.maxBytesPerFile = maxBytesPerFile;
   }
 
   @Override
-  public PCollection<FileWriteResult> expand(
-      PCollection<KV<ShardedKey<String>, Iterable<Row>>> input) {
+  public PCollection<FileWriteResult> expand(PCollection<KV<String, Row>> 
input) {
     return input.apply(
         ParDo.of(
-            new WriteGroupedRowsToFilesDoFn(
-                catalogConfig, dynamicDestinations, 
DEFAULT_MAX_BYTES_PER_FILE, filePrefix)));
+            new WriteDirectRowsToFilesDoFn(
+                catalogConfig, dynamicDestinations, maxBytesPerFile, 
filePrefix)));
   }
 
-  private static class WriteGroupedRowsToFilesDoFn
-      extends DoFn<KV<ShardedKey<String>, Iterable<Row>>, FileWriteResult> {
+  private static class WriteDirectRowsToFilesDoFn extends DoFn<KV<String, 
Row>, FileWriteResult> {
 
     private final DynamicDestinations dynamicDestinations;
     private final IcebergCatalogConfig catalogConfig;
     private transient @MonotonicNonNull Catalog catalog;
     private final String filePrefix;
     private final long maxFileSize;
+    private transient @Nullable RecordWriterManager recordWriterManager;
 
-    WriteGroupedRowsToFilesDoFn(
+    WriteDirectRowsToFilesDoFn(
         IcebergCatalogConfig catalogConfig,
         DynamicDestinations dynamicDestinations,
         long maxFileSize,
@@ -79,6 +80,7 @@ class WriteGroupedRowsToFiles
       this.dynamicDestinations = dynamicDestinations;
       this.filePrefix = filePrefix;
       this.maxFileSize = maxFileSize;
+      this.recordWriterManager = null;
     }
 
     private org.apache.iceberg.catalog.Catalog getCatalog() {
@@ -88,36 +90,52 @@ class WriteGroupedRowsToFiles
       return catalog;
     }
 
+    @StartBundle
+    public void startBundle() {
+      recordWriterManager =
+          new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, 
Integer.MAX_VALUE);
+    }
+
     @ProcessElement
     public void processElement(
-        ProcessContext c,
-        @Element KV<ShardedKey<String>, Iterable<Row>> element,
+        ProcessContext context,
+        @Element KV<String, Row> element,
         BoundedWindow window,
         PaneInfo paneInfo)
         throws Exception {
-
-      String tableIdentifier = element.getKey().getKey();
+      String tableIdentifier = element.getKey();
       IcebergDestination destination = 
dynamicDestinations.instantiateDestination(tableIdentifier);
       WindowedValue<IcebergDestination> windowedDestination =
           WindowedValues.of(destination, window.maxTimestamp(), window, 
paneInfo);
-      RecordWriterManager writer;
-      try (RecordWriterManager openWriter =
-          new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, 
Integer.MAX_VALUE)) {
-        writer = openWriter;
-        for (Row e : element.getValue()) {
-          writer.write(windowedDestination, e);
-        }
+      Preconditions.checkNotNull(recordWriterManager)
+          .write(windowedDestination, element.getValue());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      if (recordWriterManager == null) {
+        return;
       }
+      recordWriterManager.close();
 
-      List<SerializableDataFile> serializableDataFiles =
-          
Preconditions.checkNotNull(writer.getSerializableDataFiles().get(windowedDestination));
-      for (SerializableDataFile dataFile : serializableDataFiles) {
-        c.output(
-            FileWriteResult.builder()
-                .setTableIdentifier(destination.getTableIdentifier())
-                .setSerializableDataFile(dataFile)
-                .build());
+      for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>>
+          destinationAndFiles :
+              Preconditions.checkNotNull(recordWriterManager)
+                  .getSerializableDataFiles()
+                  .entrySet()) {
+        WindowedValue<IcebergDestination> windowedDestination = 
destinationAndFiles.getKey();
+
+        for (SerializableDataFile dataFile : destinationAndFiles.getValue()) {
+          context.output(
+              FileWriteResult.builder()
+                  .setSerializableDataFile(dataFile)
+                  
.setTableIdentifier(windowedDestination.getValue().getTableIdentifier())
+                  .build(),
+              windowedDestination.getTimestamp(),
+              Iterables.getFirst(windowedDestination.getWindows(), null));
+        }
       }
+      recordWriterManager = null;
     }
   }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 7db1ac42659..12d9570d4a3 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -36,8 +36,7 @@ import 
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 class WriteGroupedRowsToFiles
     extends PTransform<
         PCollection<KV<ShardedKey<String>, Iterable<Row>>>, 
PCollection<FileWriteResult>> {
-
-  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
+  private final long maxBytesPerFile;
 
   private final DynamicDestinations dynamicDestinations;
   private final IcebergCatalogConfig catalogConfig;
@@ -46,10 +45,12 @@ class WriteGroupedRowsToFiles
   WriteGroupedRowsToFiles(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      String filePrefix) {
+      String filePrefix,
+      long maxBytesPerFile) {
     this.catalogConfig = catalogConfig;
     this.dynamicDestinations = dynamicDestinations;
     this.filePrefix = filePrefix;
+    this.maxBytesPerFile = maxBytesPerFile;
   }
 
   @Override
@@ -58,7 +59,7 @@ class WriteGroupedRowsToFiles
     return input.apply(
         ParDo.of(
             new WriteGroupedRowsToFilesDoFn(
-                catalogConfig, dynamicDestinations, 
DEFAULT_MAX_BYTES_PER_FILE, filePrefix)));
+                catalogConfig, dynamicDestinations, maxBytesPerFile, 
filePrefix)));
   }
 
   private static class WriteGroupedRowsToFilesDoFn
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
index fb3bf43f351..bea84fc826b 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
@@ -17,8 +17,11 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -28,6 +31,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -36,7 +40,9 @@ import org.apache.beam.sdk.util.ShardedKey;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -47,19 +53,22 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
   private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000;
   // Used for auto-sharding in streaming. Limits total byte size per batch/file
   public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB
-  static final int DEFAULT_NUM_FILE_SHARDS = 0;
+  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
   private final IcebergCatalogConfig catalogConfig;
   private final DynamicDestinations dynamicDestinations;
   private final @Nullable Duration triggeringFrequency;
   private final String filePrefix;
+  private final @Nullable Integer directWriteByteLimit;
 
   WriteToDestinations(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      @Nullable Duration triggeringFrequency) {
+      @Nullable Duration triggeringFrequency,
+      @Nullable Integer directWriteByteLimit) {
     this.dynamicDestinations = dynamicDestinations;
     this.catalogConfig = catalogConfig;
     this.triggeringFrequency = triggeringFrequency;
+    this.directWriteByteLimit = directWriteByteLimit;
     // single unique prefix per write transform
     this.filePrefix = UUID.randomUUID().toString();
   }
@@ -67,10 +76,15 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
   @Override
   public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {
     // Write records to files
-    PCollection<FileWriteResult> writtenFiles =
-        input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)
-            ? writeTriggered(input)
-            : writeUntriggered(input);
+    PCollection<FileWriteResult> writtenFiles;
+    if (IcebergUtils.isUnbounded(input)) {
+      writtenFiles =
+          IcebergUtils.validDirectWriteLimit(directWriteByteLimit)
+              ? writeTriggeredWithBundleLifting(input)
+              : writeTriggered(input);
+    } else {
+      writtenFiles = writeUntriggered(input);
+    }
 
     // Commit files to tables
     PCollection<KV<String, SnapshotInfo>> snapshots =
@@ -79,17 +93,12 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
     return new IcebergWriteResult(input.getPipeline(), snapshots);
   }
 
-  private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, 
Row>> input) {
-    checkArgumentNotNull(
-        triggeringFrequency, "Streaming pipelines must set a triggering 
frequency.");
-
-    // Group records into batches to avoid writing thousands of small files
+  private PCollection<FileWriteResult> 
groupAndWriteRecords(PCollection<KV<String, Row>> input) {
+    // We rely on GroupIntoBatches to group and parallelize records properly,
+    // respecting our thresholds for number of records and bytes per batch.
+    // Each output batch will be written to a file.
     PCollection<KV<ShardedKey<String>, Iterable<Row>>> groupedRecords =
         input
-            .apply("WindowIntoGlobal", Window.into(new GlobalWindows()))
-            // We rely on GroupIntoBatches to group and parallelize records 
properly,
-            // respecting our thresholds for number of records and bytes per 
batch.
-            // Each output batch will be written to a file.
             .apply(
                 GroupIntoBatches.<String, 
Row>ofSize(FILE_TRIGGERING_RECORD_COUNT)
                     .withByteSize(FILE_TRIGGERING_BYTE_COUNT)
@@ -100,19 +109,72 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
                     
org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()),
                     
IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
 
-    return groupedRecords
-        .apply(
-            "WriteGroupedRows",
-            new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, 
filePrefix))
-        // Respect user's triggering frequency before committing snapshots
-        .apply(
-            "ApplyUserTrigger",
-            Window.<FileWriteResult>into(new GlobalWindows())
-                .triggering(
-                    Repeatedly.forever(
-                        AfterProcessingTime.pastFirstElementInPane()
-                            
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
-                .discardingFiredPanes());
+    return groupedRecords.apply(
+        "WriteGroupedRows",
+        new WriteGroupedRowsToFiles(
+            catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
+  }
+
+  private PCollection<FileWriteResult> 
applyUserTriggering(PCollection<FileWriteResult> input) {
+    return input.apply(
+        "ApplyUserTrigger",
+        Window.<FileWriteResult>into(new GlobalWindows())
+            .triggering(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        
.plusDelayOf(checkArgumentNotNull(triggeringFrequency))))
+            .discardingFiredPanes());
+  }
+
+  private PCollection<FileWriteResult> writeTriggeredWithBundleLifting(
+      PCollection<KV<String, Row>> input) {
+    checkArgumentNotNull(
+        triggeringFrequency, "Streaming pipelines must set a triggering 
frequency.");
+    checkArgumentNotNull(
+        directWriteByteLimit, "Must set non-null directWriteByteLimit for 
bundle lifting.");
+
+    final TupleTag<KV<String, Row>> groupedRecordsTag = new 
TupleTag<>("small_batches");
+    final TupleTag<KV<String, Row>> directRecordsTag = new 
TupleTag<>("large_batches");
+
+    input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+    PCollectionTuple bundleOutputs =
+        input.apply(
+            BundleLifter.of(
+                groupedRecordsTag, directRecordsTag, directWriteByteLimit, new 
RowSizer()));
+
+    PCollection<KV<String, Row>> smallBatches =
+        bundleOutputs
+            .get(groupedRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+    PCollection<KV<String, Row>> largeBatches =
+        bundleOutputs
+            .get(directRecordsTag)
+            .setCoder(
+                KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(dynamicDestinations.getDataSchema())));
+
+    PCollection<FileWriteResult> directFileWrites =
+        largeBatches.apply(
+            "WriteDirectRowsToFiles",
+            new WriteDirectRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
+
+    PCollection<FileWriteResult> groupedFileWrites = 
groupAndWriteRecords(smallBatches);
+
+    PCollection<FileWriteResult> allFileWrites =
+        PCollectionList.of(groupedFileWrites)
+            .and(directFileWrites)
+            .apply(Flatten.<FileWriteResult>pCollections());
+
+    return applyUserTriggering(allFileWrites);
+  }
+
+  private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, 
Row>> input) {
+    checkArgumentNotNull(
+        triggeringFrequency, "Streaming pipelines must set a triggering 
frequency.");
+    input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows()));
+    PCollection<FileWriteResult> files = groupAndWriteRecords(input);
+    return applyUserTriggering(files);
   }
 
   private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, 
Row>> input) {
@@ -126,7 +188,8 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
     WriteUngroupedRowsToFiles.Result writeUngroupedResult =
         input.apply(
             "Fast-path write rows",
-            new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations, 
filePrefix));
+            new WriteUngroupedRowsToFiles(
+                catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
 
     // Then write the rest by shuffling on the destination
     PCollection<FileWriteResult> writeGroupedResult =
@@ -135,10 +198,60 @@ class WriteToDestinations extends 
PTransform<PCollection<KV<String, Row>>, Icebe
             .apply("Group spilled rows by destination shard", 
GroupByKey.create())
             .apply(
                 "Write remaining rows to files",
-                new WriteGroupedRowsToFiles(catalogConfig, 
dynamicDestinations, filePrefix));
+                new WriteGroupedRowsToFiles(
+                    catalogConfig, dynamicDestinations, filePrefix, 
DEFAULT_MAX_BYTES_PER_FILE));
 
     return PCollectionList.of(writeUngroupedResult.getWrittenFiles())
         .and(writeGroupedResult)
         .apply("Flatten Written Files", Flatten.pCollections());
   }
+
+  /**
+   * A SerializableFunction to estimate the byte size of a Row for bundling 
purposes. This is a
+   * heuristic that avoids the high cost of encoding each row with a Coder.
+   */
+  private static class RowSizer implements SerializableFunction<KV<String, 
Row>, Integer> {
+    @Override
+    public Integer apply(KV<String, Row> element) {
+      return estimateRowSize(element.getValue());
+    }
+
+    private int estimateRowSize(Row row) {
+      if (row == null) {
+        return 0;
+      }
+      int size = 0;
+      for (Object value : row.getValues()) {
+        size += estimateObjectSize(value);
+      }
+      return size;
+    }
+
+    private int estimateObjectSize(@Nullable Object value) {
+      if (value == null) {
+        return 0;
+      }
+      if (value instanceof String) {
+        return ((String) value).getBytes(UTF_8).length;
+      } else if (value instanceof byte[]) {
+        return ((byte[]) value).length;
+      } else if (value instanceof Row) {
+        return estimateRowSize((Row) value);
+      } else if (value instanceof List) {
+        int listSize = 0;
+        for (Object item : (List) value) {
+          listSize += estimateObjectSize(item);
+        }
+        return listSize;
+      } else if (value instanceof Map) {
+        int mapSize = 0;
+        for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+          mapSize += estimateObjectSize(entry.getKey()) + 
estimateObjectSize(entry.getValue());
+        }
+        return mapSize;
+      } else {
+        return 8; // Approximation for other fields
+      }
+    }
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index bf2a5a3535f..1db6ede3016 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -65,8 +65,6 @@ class WriteUngroupedRowsToFiles
    */
   @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20;
 
-  private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
-
   private static final TupleTag<FileWriteResult> WRITTEN_FILES_TAG = new 
TupleTag<>("writtenFiles");
   private static final TupleTag<Row> WRITTEN_ROWS_TAG = new 
TupleTag<Row>("writtenRows") {};
   private static final TupleTag<KV<ShardedKey<String>, Row>> SPILLED_ROWS_TAG =
@@ -75,14 +73,17 @@ class WriteUngroupedRowsToFiles
   private final String filePrefix;
   private final DynamicDestinations dynamicDestinations;
   private final IcebergCatalogConfig catalogConfig;
+  private final long maxBytesPerFile;
 
   WriteUngroupedRowsToFiles(
       IcebergCatalogConfig catalogConfig,
       DynamicDestinations dynamicDestinations,
-      String filePrefix) {
+      String filePrefix,
+      long maxBytesPerFile) {
     this.catalogConfig = catalogConfig;
     this.dynamicDestinations = dynamicDestinations;
     this.filePrefix = filePrefix;
+    this.maxBytesPerFile = maxBytesPerFile;
   }
 
   @Override
@@ -96,7 +97,7 @@ class WriteUngroupedRowsToFiles
                         dynamicDestinations,
                         filePrefix,
                         DEFAULT_MAX_WRITERS_PER_BUNDLE,
-                        DEFAULT_MAX_BYTES_PER_FILE))
+                        maxBytesPerFile))
                 .withOutputTags(
                     WRITTEN_FILES_TAG,
                     TupleTagList.of(ImmutableList.of(WRITTEN_ROWS_TAG, 
SPILLED_ROWS_TAG))));
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
new file mode 100644
index 00000000000..1eaa0920e6c
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+
+import org.apache.beam.sdk.io.iceberg.BundleLifter.BundleLiftDoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BundleLifterTest {
+
+  private static final TupleTag<Integer> INTEGER_SMALL = new 
TupleTag<Integer>() {};
+  private static final TupleTag<Integer> INTEGER_LARGE = new 
TupleTag<Integer>() {};
+  private static final TupleTag<String> STRING_SMALL = new TupleTag<String>() 
{};
+  private static final TupleTag<String> STRING_LARGE = new TupleTag<String>() 
{};
+
+  @Test
+  public void testSmallBundle() throws Exception {
+    DoFnTester<Integer, Void> tester =
+        DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x 
-> 1));
+
+    tester.startBundle();
+    tester.processElement(1);
+    tester.processElement(2);
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElements(INTEGER_SMALL), containsInAnyOrder(1, 
2));
+    assertThat(tester.peekOutputElements(INTEGER_LARGE), empty());
+  }
+
+  @Test
+  public void testLargeBundle() throws Exception {
+    DoFnTester<Integer, Void> tester =
+        DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x 
-> 1));
+
+    tester.startBundle();
+    tester.processElement(1);
+    tester.processElement(2);
+    tester.processElement(3);
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElements(INTEGER_SMALL), empty());
+    assertThat(tester.peekOutputElements(INTEGER_LARGE), containsInAnyOrder(1, 
2, 3));
+  }
+
+  @Test
+  public void testSmallBundleWithSizer() throws Exception {
+    DoFnTester<String, Void> tester =
+        DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e 
-> e.length()));
+
+    tester.startBundle();
+    tester.processElement("123");
+    tester.processElement("456");
+    tester.processElement("789");
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElements(STRING_SMALL), 
containsInAnyOrder("123", "456", "789"));
+    assertThat(tester.peekOutputElements(STRING_LARGE), empty());
+  }
+
+  @Test
+  public void testLargeBundleWithSizer() throws Exception {
+    DoFnTester<String, Void> tester =
+        DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e 
-> e.length()));
+
+    tester.startBundle();
+    tester.processElement("123");
+    tester.processElement("456");
+    tester.processElement("789");
+    tester.processElement("0");
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElements(STRING_SMALL), empty());
+    assertThat(
+        tester.peekOutputElements(STRING_LARGE), containsInAnyOrder("123", 
"456", "789", "0"));
+  }
+}


Reply via email to