This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e24911c5b75 enable setting max_writer_per_bundle for avroIO and other 
IO (#35092)
e24911c5b75 is described below

commit e24911c5b75dff57817a8b602645b7bc4348bb92
Author: Tanu Sharma <[email protected]>
AuthorDate: Thu Jun 12 20:39:20 2025 +0530

    enable setting max_writer_per_bundle for avroIO and other IO (#35092)
    
    * enable setting max_writer_per_bundle for avroIO and other IO
    
    * enable setting max_writer_per_bundle for avroIO and other IO
    
    * enable for TFRecordIO and corrections
    
    * Updated standard_external_transforms.yaml
---
 .../main/java/org/apache/beam/sdk/io/FileIO.java    | 20 ++++++++++++++++++++
 .../java/org/apache/beam/sdk/io/TFRecordIO.java     | 13 +++++++++++++
 .../TFRecordWriteSchemaTransformConfiguration.java  |  7 +++++++
 .../io/TFRecordWriteSchemaTransformProvider.java    |  4 ++++
 .../main/java/org/apache/beam/sdk/io/TextIO.java    | 21 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/io/WriteFiles.java     |  7 +++++++
 .../sdk/io/TFRecordSchemaTransformProviderTest.java |  1 +
 .../org/apache/beam/sdk/io/TextIOWriteTest.java     |  2 ++
 .../apache/beam/sdk/extensions/avro/io/AvroIO.java  | 14 ++++++++++++++
 .../main/java/org/apache/beam/sdk/io/csv/CsvIO.java | 11 +++++++++++
 .../java/org/apache/beam/sdk/io/json/JsonIO.java    | 11 +++++++++++
 sdks/standard_external_transforms.yaml              |  7 ++++++-
 12 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index d51a3c188af..d5c235b696c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -1043,6 +1043,8 @@ public class FileIO {
 
     abstract boolean getNoSpilling();
 
+    abstract @Nullable Integer getMaxNumWritersPerBundle();
+
     abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
 
     abstract Builder<DestinationT, UserT> toBuilder();
@@ -1093,6 +1095,9 @@ public class FileIO {
 
       abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);
 
+      abstract Builder<DestinationT, UserT> setMaxNumWritersPerBundle(
+          @Nullable Integer maxNumWritersPerBundle);
+
       abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
           @Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);
 
@@ -1326,6 +1331,15 @@ public class FileIO {
       return toBuilder().setNoSpilling(true).build();
     }
 
+    /**
+     * Set the maximum number of writers created in a bundle before spilling 
to shuffle. See {@link
+     * WriteFiles#withMaxNumWritersPerBundle()}.
+     */
+    public Write<DestinationT, UserT> withMaxNumWritersPerBundle(
+        @Nullable Integer maxNumWritersPerBundle) {
+      return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+    }
+
     /**
      * Configures a new {@link Write} with an ErrorHandler. For configuring an 
ErrorHandler, see
      * {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a 
dynamic destination
@@ -1424,6 +1438,9 @@ public class FileIO {
       resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
       resolvedSpec.setAutoSharding(getAutoSharding());
       resolvedSpec.setNoSpilling(getNoSpilling());
+      if (getMaxNumWritersPerBundle() != null) {
+        resolvedSpec.setMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+      }
 
       Write<DestinationT, UserT> resolved = resolvedSpec.build();
       WriteFiles<UserT, DestinationT, ?> writeFiles =
@@ -1445,6 +1462,9 @@ public class FileIO {
       if (getNoSpilling()) {
         writeFiles = writeFiles.withNoSpilling();
       }
+      if (getMaxNumWritersPerBundle() != null) {
+        writeFiles = 
writeFiles.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+      }
       if (getBadRecordErrorHandler() != null) {
         writeFiles = 
writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index dc76d901657..52982e2fe16 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -274,6 +274,9 @@ public class TFRecordIO {
     /** Whether to skip the spilling of data caused by having 
maxNumWritersPerBundle. */
     abstract boolean getNoSpilling();
 
+    /** Maximum number of writers created in a bundle before spilling to 
shuffle. */
+    abstract @Nullable Integer getMaxNumWritersPerBundle();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -290,6 +293,8 @@ public class TFRecordIO {
 
       abstract Builder setNoSpilling(boolean noSpilling);
 
+      abstract Builder setMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle);
+
       abstract Write build();
     }
 
@@ -388,6 +393,11 @@ public class TFRecordIO {
       return toBuilder().setNoSpilling(true).build();
     }
 
+    /** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
+    public Write withMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle) {
+      return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+    }
+
     @Override
     public PDone expand(PCollection<byte[]> input) {
       checkState(
@@ -403,6 +413,9 @@ public class TFRecordIO {
       if (getNoSpilling()) {
         write = write.withNoSpilling();
       }
+      if (getMaxNumWritersPerBundle() != null) {
+        write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+      }
       input.apply("Write", write);
       return PDone.in(input.getPipeline());
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
index e123b5c0847..8167d4a399e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
@@ -80,6 +80,11 @@ public abstract class 
TFRecordWriteSchemaTransformConfiguration {
   @Nullable
   public abstract Boolean getNoSpilling();
 
+  @SchemaFieldDescription(
+      "Maximum number of writers created in a bundle before spilling to 
shuffle.")
+  @Nullable
+  public abstract Integer getMaxNumWritersPerBundle();
+
   @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
   @Nullable
   public abstract ErrorHandling getErrorHandling();
@@ -99,6 +104,8 @@ public abstract class 
TFRecordWriteSchemaTransformConfiguration {
 
     public abstract Builder setNoSpilling(Boolean value);
 
+    public abstract Builder setMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle);
+
     public abstract Builder setErrorHandling(ErrorHandling errorHandling);
 
     /** Builds the {@link TFRecordWriteSchemaTransformConfiguration} 
configuration. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
index b45d8584be5..bc9b7bbeac6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
@@ -132,6 +132,10 @@ public class TFRecordWriteSchemaTransformProvider
       if (Boolean.TRUE.equals(configuration.getNoSpilling())) {
         writeTransform = writeTransform.withNoSpilling();
       }
+      if (configuration.getMaxNumWritersPerBundle() != null) {
+        writeTransform =
+            
writeTransform.withMaxNumWritersPerBundle(configuration.getMaxNumWritersPerBundle());
+      }
 
       // Obtain input schema and verify only one field and its bytes
       Schema inputSchema = input.get(INPUT).getSchema();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f9844f3a73a..f26f23b4656 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -718,6 +718,9 @@ public class TextIO {
     /** Whether to skip the spilling of data caused by having 
maxNumWritersPerBundle. */
     abstract boolean getNoSpilling();
 
+    /** Maximum number of writers created in a bundle before spilling to 
shuffle. */
+    abstract @Nullable Integer getMaxNumWritersPerBundle();
+
     /** Whether to skip writing any output files if the PCollection is empty. 
*/
     abstract boolean getSkipIfEmpty();
 
@@ -779,6 +782,9 @@ public class TextIO {
 
       abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);
 
+      abstract Builder<UserT, DestinationT> setMaxNumWritersPerBundle(
+          @Nullable Integer maxNumWritersPerBundle);
+
       abstract Builder<UserT, DestinationT> setSkipIfEmpty(boolean noSpilling);
 
       abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
@@ -1062,6 +1068,12 @@ public class TextIO {
       return toBuilder().setNoSpilling(true).build();
     }
 
+    /** Set the maximum number of writers created in a bundle before spilling 
to shuffle. */
+    public TypedWrite<UserT, DestinationT> withMaxNumWritersPerBundle(
+        @Nullable Integer maxNumWritersPerBundle) {
+      return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+    }
+
     /** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for 
details on usage. */
     public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
         ErrorHandler<BadRecord, ?> errorHandler) {
@@ -1161,6 +1173,9 @@ public class TextIO {
       if (getNoSpilling()) {
         write = write.withNoSpilling();
       }
+      if (getMaxNumWritersPerBundle() != null) {
+        write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+      }
       if (getBadRecordErrorHandler() != null) {
         write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
       }
@@ -1187,6 +1202,7 @@ public class TextIO {
       builder
           .addIfNotNull(
               DisplayData.item("numShards", getNumShards()).withLabel("Maximum 
Output Shards"))
+          .addIfNotNull(DisplayData.item("maxNumWritersPerBundle", 
getMaxNumWritersPerBundle()))
           .addIfNotNull(
               DisplayData.item("tempDirectory", getTempDirectory())
                   .withLabel("Directory for temporary files"))
@@ -1348,6 +1364,11 @@ public class TextIO {
       return new Write(inner.withNoSpilling());
     }
 
+    /** See {@link TypedWrite#withMaxNumWritersPerBundle(Integer)}. */
+    public Write withMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle) {
+      return new 
Write(inner.withMaxNumWritersPerBundle(maxNumWritersPerBundle));
+    }
+
     /** See {@link TypedWrite#withBatchSize(Integer)}. */
     public Write withBatchSize(@Nullable Integer batchSize) {
       return new Write(inner.withBatchSize(batchSize));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index cb48931958c..c1b56a2b445 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -292,6 +292,13 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
   /** Set the maximum number of writers created in a bundle before spilling to 
shuffle. */
   public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
       int maxNumWritersPerBundle) {
+    checkArgument(
+        getMaxNumWritersPerBundle() != -1,
+        "Cannot use withMaxNumWritersPerBundle() after withNoSpilling() has 
been set.");
+    checkArgument(
+        maxNumWritersPerBundle > 0 && maxNumWritersPerBundle <= 
DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
+        "maxNumWritersPerBundle must be greater than 0 and less than or equal 
to %s",
+        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
     return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
index 5adbcbb8152..9c067a533e0 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
@@ -276,6 +276,7 @@ public class TFRecordSchemaTransformProviderTest {
             "num_shards",
             "compression",
             "no_spilling",
+            "max_num_writers_per_bundle",
             "error_handling"),
         tfrecordProvider.configurationSchema().getFields().stream()
             .map(field -> field.getName())
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 695ff4474d7..eba0f793265 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -650,6 +650,7 @@ public class TextIOWriteTest {
             .withSuffix("bar")
             .withShardNameTemplate("-SS-of-NN-")
             .withNumShards(100)
+            .withMaxNumWritersPerBundle(5)
             .withFooter("myFooter")
             .withHeader("myHeader");
 
@@ -661,6 +662,7 @@ public class TextIOWriteTest {
     assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
     assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
     assertThat(displayData, hasDisplayItem("numShards", 100));
+    assertThat(displayData, hasDisplayItem("maxNumWritersPerBundle", 5));
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", 
"UNCOMPRESSED"));
   }
 
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
index 2e4939560ad..2ddde14bcc2 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
@@ -1426,6 +1426,8 @@ public class AvroIO {
 
     abstract boolean getNoSpilling();
 
+    abstract @Nullable Integer getMaxNumWritersPerBundle();
+
     abstract @Nullable FilenamePolicy getFilenamePolicy();
 
     abstract @Nullable DynamicAvroDestinations<UserT, DestinationT, OutputT>
@@ -1483,6 +1485,9 @@ public class AvroIO {
 
       abstract Builder<UserT, DestinationT, OutputT> setNoSpilling(boolean 
noSpilling);
 
+      abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
+          @Nullable Integer maxNumWritersPerBundle);
+
       abstract Builder<UserT, DestinationT, OutputT> setFilenamePolicy(
           FilenamePolicy filenamePolicy);
 
@@ -1690,6 +1695,12 @@ public class AvroIO {
       return toBuilder().setNoSpilling(true).build();
     }
 
+    /** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
+    public TypedWrite<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
+        @Nullable Integer maxNumWritersPerBundle) {
+      return 
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+    }
+
     /** Writes to Avro file(s) compressed using specified codec. */
     public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory 
codec) {
       return toBuilder().setCodec(new 
SerializableAvroCodecFactory(codec)).build();
@@ -1799,6 +1810,9 @@ public class AvroIO {
       if (getNoSpilling()) {
         write = write.withNoSpilling();
       }
+      if (getMaxNumWritersPerBundle() != null) {
+        write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+      }
       if (getBadRecordErrorHandler() != null) {
         write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
       }
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
index fc2b68c0a89..d71299cceb6 100644
--- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * {@link PTransform}s for reading and writing CSV files.
@@ -550,6 +551,16 @@ public class CsvIO {
       return 
toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
     }
 
+    /**
+     * Set the maximum number of writers created in a bundle before spilling 
to shuffle. See {@link
+     * WriteFiles#withMaxNumWritersPerBundle()}.
+     */
+    public Write<T> withMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle) {
+      return toBuilder()
+          
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
+          .build();
+    }
+
     /**
      * Specifies to use a given fixed number of shards per window. See {@link
      * TextIO.Write#withNumShards}.
diff --git 
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java 
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
index 3abb29a8042..1cb576e8f42 100644
--- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * {@link PTransform}s for reading and writing JSON files.
@@ -170,6 +171,16 @@ public class JsonIO {
       return 
toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
     }
 
+    /**
+     * Set the maximum number of writers created in a bundle before spilling 
to shuffle. See {@link
+     * WriteFiles#withMaxNumWritersPerBundle()}.
+     */
+    public Write<T> withMaxNumWritersPerBundle(@Nullable Integer 
maxNumWritersPerBundle) {
+      return toBuilder()
+          
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
+          .build();
+    }
+
     /**
      * Specifies to use a given fixed number of shards per window. See {@link
      * TextIO.Write#withNumShards}.
diff --git a/sdks/standard_external_transforms.yaml 
b/sdks/standard_external_transforms.yaml
index f5d71830145..1c536ce319d 100644
--- a/sdks/standard_external_transforms.yaml
+++ b/sdks/standard_external_transforms.yaml
@@ -19,7 +19,7 @@
 # configuration in /sdks/standard_expansion_services.yaml.
 # Refer to gen_xlang_wrappers.py for more info.
 #
-# Last updated on: 2025-04-24
+# Last updated on: 2025-06-05
 
 - default_service: sdks:java:io:expansion-service:shadowJar
   description: 'Outputs a PCollection of Beam Rows, each containing a single 
INT64
@@ -91,6 +91,11 @@
     name: filename_suffix
     nullable: true
     type: str
+  - description: Maximum number of writers created in a bundle before spilling 
to
+      shuffle.
+    name: max_num_writers_per_bundle
+    nullable: true
+    type: int32
   - description: Whether to skip the spilling of data caused by having 
maxNumWritersPerBundle.
     name: no_spilling
     nullable: true

Reply via email to