This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e24911c5b75 enable setting max_writer_per_bundle for avroIO and other
IO (#35092)
e24911c5b75 is described below
commit e24911c5b75dff57817a8b602645b7bc4348bb92
Author: Tanu Sharma <[email protected]>
AuthorDate: Thu Jun 12 20:39:20 2025 +0530
enable setting max_writer_per_bundle for avroIO and other IO (#35092)
* enable setting max_writer_per_bundle for avroIO and other IO
* enable setting max_writer_per_bundle for avroIO and other IO
* enable for TFRecordIO and corrections
* Updated standard_external_transforms.yaml
---
.../main/java/org/apache/beam/sdk/io/FileIO.java | 20 ++++++++++++++++++++
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 13 +++++++++++++
.../TFRecordWriteSchemaTransformConfiguration.java | 7 +++++++
.../io/TFRecordWriteSchemaTransformProvider.java | 4 ++++
.../main/java/org/apache/beam/sdk/io/TextIO.java | 21 +++++++++++++++++++++
.../java/org/apache/beam/sdk/io/WriteFiles.java | 7 +++++++
.../sdk/io/TFRecordSchemaTransformProviderTest.java | 1 +
.../org/apache/beam/sdk/io/TextIOWriteTest.java | 2 ++
.../apache/beam/sdk/extensions/avro/io/AvroIO.java | 14 ++++++++++++++
.../main/java/org/apache/beam/sdk/io/csv/CsvIO.java | 11 +++++++++++
.../java/org/apache/beam/sdk/io/json/JsonIO.java | 11 +++++++++++
sdks/standard_external_transforms.yaml | 7 ++++++-
12 files changed, 117 insertions(+), 1 deletion(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index d51a3c188af..d5c235b696c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -1043,6 +1043,8 @@ public class FileIO {
abstract boolean getNoSpilling();
+ abstract @Nullable Integer getMaxNumWritersPerBundle();
+
abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
abstract Builder<DestinationT, UserT> toBuilder();
@@ -1093,6 +1095,9 @@ public class FileIO {
abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);
+ abstract Builder<DestinationT, UserT> setMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle);
+
abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);
@@ -1326,6 +1331,15 @@ public class FileIO {
return toBuilder().setNoSpilling(true).build();
}
+ /**
+ * Set the maximum number of writers created in a bundle before spilling
to shuffle. See {@link
+ * WriteFiles#withMaxNumWritersPerBundle()}.
+ */
+ public Write<DestinationT, UserT> withMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle) {
+ return
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+ }
+
/**
* Configures a new {@link Write} with an ErrorHandler. For configuring an
ErrorHandler, see
* {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a
dynamic destination
@@ -1424,6 +1438,9 @@ public class FileIO {
resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
resolvedSpec.setAutoSharding(getAutoSharding());
resolvedSpec.setNoSpilling(getNoSpilling());
+ if (getMaxNumWritersPerBundle() != null) {
+ resolvedSpec.setMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+ }
Write<DestinationT, UserT> resolved = resolvedSpec.build();
WriteFiles<UserT, DestinationT, ?> writeFiles =
@@ -1445,6 +1462,9 @@ public class FileIO {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
+ if (getMaxNumWritersPerBundle() != null) {
+ writeFiles =
writeFiles.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+ }
if (getBadRecordErrorHandler() != null) {
writeFiles =
writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index dc76d901657..52982e2fe16 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -274,6 +274,9 @@ public class TFRecordIO {
/** Whether to skip the spilling of data caused by having
maxNumWritersPerBundle. */
abstract boolean getNoSpilling();
+ /** Maximum number of writers created in a bundle before spilling to
shuffle. */
+ abstract @Nullable Integer getMaxNumWritersPerBundle();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -290,6 +293,8 @@ public class TFRecordIO {
abstract Builder setNoSpilling(boolean noSpilling);
+ abstract Builder setMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle);
+
abstract Write build();
}
@@ -388,6 +393,11 @@ public class TFRecordIO {
return toBuilder().setNoSpilling(true).build();
}
+ /** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
+ public Write withMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle) {
+ return
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+ }
+
@Override
public PDone expand(PCollection<byte[]> input) {
checkState(
@@ -403,6 +413,9 @@ public class TFRecordIO {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
+ if (getMaxNumWritersPerBundle() != null) {
+ write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+ }
input.apply("Write", write);
return PDone.in(input.getPipeline());
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
index e123b5c0847..8167d4a399e 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformConfiguration.java
@@ -80,6 +80,11 @@ public abstract class
TFRecordWriteSchemaTransformConfiguration {
@Nullable
public abstract Boolean getNoSpilling();
+ @SchemaFieldDescription(
+ "Maximum number of writers created in a bundle before spilling to
shuffle.")
+ @Nullable
+ public abstract Integer getMaxNumWritersPerBundle();
+
@SchemaFieldDescription("This option specifies whether and where to output
unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();
@@ -99,6 +104,8 @@ public abstract class
TFRecordWriteSchemaTransformConfiguration {
public abstract Builder setNoSpilling(Boolean value);
+ public abstract Builder setMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle);
+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
/** Builds the {@link TFRecordWriteSchemaTransformConfiguration}
configuration. */
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
index b45d8584be5..bc9b7bbeac6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordWriteSchemaTransformProvider.java
@@ -132,6 +132,10 @@ public class TFRecordWriteSchemaTransformProvider
if (Boolean.TRUE.equals(configuration.getNoSpilling())) {
writeTransform = writeTransform.withNoSpilling();
}
+ if (configuration.getMaxNumWritersPerBundle() != null) {
+ writeTransform =
+
writeTransform.withMaxNumWritersPerBundle(configuration.getMaxNumWritersPerBundle());
+ }
// Obtain input schema and verify only one field and its bytes
Schema inputSchema = input.get(INPUT).getSchema();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f9844f3a73a..f26f23b4656 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -718,6 +718,9 @@ public class TextIO {
/** Whether to skip the spilling of data caused by having
maxNumWritersPerBundle. */
abstract boolean getNoSpilling();
+ /** Maximum number of writers created in a bundle before spilling to
shuffle. */
+ abstract @Nullable Integer getMaxNumWritersPerBundle();
+
/** Whether to skip writing any output files if the PCollection is empty.
*/
abstract boolean getSkipIfEmpty();
@@ -779,6 +782,9 @@ public class TextIO {
abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);
+ abstract Builder<UserT, DestinationT> setMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle);
+
abstract Builder<UserT, DestinationT> setSkipIfEmpty(boolean noSpilling);
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
@@ -1062,6 +1068,12 @@ public class TextIO {
return toBuilder().setNoSpilling(true).build();
}
+ /** Set the maximum number of writers created in a bundle before spilling
to shuffle. */
+ public TypedWrite<UserT, DestinationT> withMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle) {
+ return
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+ }
+
/** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for
details on usage. */
public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
@@ -1161,6 +1173,9 @@ public class TextIO {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
+ if (getMaxNumWritersPerBundle() != null) {
+ write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+ }
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
@@ -1187,6 +1202,7 @@ public class TextIO {
builder
.addIfNotNull(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum
Output Shards"))
+ .addIfNotNull(DisplayData.item("maxNumWritersPerBundle",
getMaxNumWritersPerBundle()))
.addIfNotNull(
DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"))
@@ -1348,6 +1364,11 @@ public class TextIO {
return new Write(inner.withNoSpilling());
}
+ /** See {@link TypedWrite#withMaxNumWritersPerBundle(Integer)}. */
+ public Write withMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle) {
+ return new
Write(inner.withMaxNumWritersPerBundle(maxNumWritersPerBundle));
+ }
+
/** See {@link TypedWrite#withBatchSize(Integer)}. */
public Write withBatchSize(@Nullable Integer batchSize) {
return new Write(inner.withBatchSize(batchSize));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index cb48931958c..c1b56a2b445 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -292,6 +292,13 @@ public abstract class WriteFiles<UserT, DestinationT,
OutputT>
/** Set the maximum number of writers created in a bundle before spilling to
shuffle. */
public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
int maxNumWritersPerBundle) {
+ checkArgument(
+ getMaxNumWritersPerBundle() != -1,
+ "Cannot use withMaxNumWritersPerBundle() after withNoSpilling() has
been set.");
+ checkArgument(
+ maxNumWritersPerBundle > 0 && maxNumWritersPerBundle <=
DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
+ "maxNumWritersPerBundle must be greater than 0 and less than or equal
to %s",
+ DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
return
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
index 5adbcbb8152..9c067a533e0 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordSchemaTransformProviderTest.java
@@ -276,6 +276,7 @@ public class TFRecordSchemaTransformProviderTest {
"num_shards",
"compression",
"no_spilling",
+ "max_num_writers_per_bundle",
"error_handling"),
tfrecordProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 695ff4474d7..eba0f793265 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -650,6 +650,7 @@ public class TextIOWriteTest {
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
+ .withMaxNumWritersPerBundle(5)
.withFooter("myFooter")
.withHeader("myHeader");
@@ -661,6 +662,7 @@ public class TextIOWriteTest {
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("numShards", 100));
+ assertThat(displayData, hasDisplayItem("maxNumWritersPerBundle", 5));
assertThat(displayData, hasDisplayItem("writableByteChannelFactory",
"UNCOMPRESSED"));
}
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
index 2e4939560ad..2ddde14bcc2 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java
@@ -1426,6 +1426,8 @@ public class AvroIO {
abstract boolean getNoSpilling();
+ abstract @Nullable Integer getMaxNumWritersPerBundle();
+
abstract @Nullable FilenamePolicy getFilenamePolicy();
abstract @Nullable DynamicAvroDestinations<UserT, DestinationT, OutputT>
@@ -1483,6 +1485,9 @@ public class AvroIO {
abstract Builder<UserT, DestinationT, OutputT> setNoSpilling(boolean
noSpilling);
+ abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle);
+
abstract Builder<UserT, DestinationT, OutputT> setFilenamePolicy(
FilenamePolicy filenamePolicy);
@@ -1690,6 +1695,12 @@ public class AvroIO {
return toBuilder().setNoSpilling(true).build();
}
+ /** See {@link WriteFiles#withMaxNumWritersPerBundle()}. */
+ public TypedWrite<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
+ @Nullable Integer maxNumWritersPerBundle) {
+ return
toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
+ }
+
/** Writes to Avro file(s) compressed using specified codec. */
public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory
codec) {
return toBuilder().setCodec(new
SerializableAvroCodecFactory(codec)).build();
@@ -1799,6 +1810,9 @@ public class AvroIO {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
+ if (getMaxNumWritersPerBundle() != null) {
+ write = write.withMaxNumWritersPerBundle(getMaxNumWritersPerBundle());
+ }
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
index fc2b68c0a89..d71299cceb6 100644
--- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* {@link PTransform}s for reading and writing CSV files.
@@ -550,6 +551,16 @@ public class CsvIO {
return
toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
}
+ /**
+ * Set the maximum number of writers created in a bundle before spilling
to shuffle. See {@link
+ * WriteFiles#withMaxNumWritersPerBundle()}.
+ */
+ public Write<T> withMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle) {
+ return toBuilder()
+
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
+ .build();
+ }
+
/**
* Specifies to use a given fixed number of shards per window. See {@link
* TextIO.Write#withNumShards}.
diff --git
a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
index 3abb29a8042..1cb576e8f42 100644
--- a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* {@link PTransform}s for reading and writing JSON files.
@@ -170,6 +171,16 @@ public class JsonIO {
return
toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
}
+ /**
+ * Set the maximum number of writers created in a bundle before spilling
to shuffle. See {@link
+ * WriteFiles#withMaxNumWritersPerBundle()}.
+ */
+ public Write<T> withMaxNumWritersPerBundle(@Nullable Integer
maxNumWritersPerBundle) {
+ return toBuilder()
+
.setTextIOWrite(getTextIOWrite().withMaxNumWritersPerBundle(maxNumWritersPerBundle))
+ .build();
+ }
+
/**
* Specifies to use a given fixed number of shards per window. See {@link
* TextIO.Write#withNumShards}.
diff --git a/sdks/standard_external_transforms.yaml
b/sdks/standard_external_transforms.yaml
index f5d71830145..1c536ce319d 100644
--- a/sdks/standard_external_transforms.yaml
+++ b/sdks/standard_external_transforms.yaml
@@ -19,7 +19,7 @@
# configuration in /sdks/standard_expansion_services.yaml.
# Refer to gen_xlang_wrappers.py for more info.
#
-# Last updated on: 2025-04-24
+# Last updated on: 2025-06-05
- default_service: sdks:java:io:expansion-service:shadowJar
description: 'Outputs a PCollection of Beam Rows, each containing a single
INT64
@@ -91,6 +91,11 @@
name: filename_suffix
nullable: true
type: str
+ - description: Maximum number of writers created in a bundle before spilling
to
+ shuffle.
+ name: max_num_writers_per_bundle
+ nullable: true
+ type: int32
- description: Whether to skip the spilling of data caused by having
maxNumWritersPerBundle.
name: no_spilling
nullable: true