This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e096daf [BEAM-6721] Set numShards dynamically for TextIO.write()
new 84e24ea Merge pull request #15500 from [BEAM-6721] Set numShards
dynamically for TextIO.write()
e096daf is described below
commit e096daf9747a4837d8c054dcb384cf8d5c48023c
Author: Minbo Bae <[email protected]>
AuthorDate: Fri Sep 10 01:08:04 2021 -0700
[BEAM-6721] Set numShards dynamically for TextIO.write()
---
.../main/java/org/apache/beam/sdk/io/TextIO.java | 25 ++++++++++++++++------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 3 ++-
2 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index baae960..8f8a699 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -268,7 +268,6 @@ public class TextIO {
.setDelimiter(new char[] {'\n'})
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
- .setNumShards(0)
.setNoSpilling(false)
.build();
}
@@ -623,7 +622,7 @@ public class TextIO {
abstract @Nullable String getFooter();
/** Requested number of shards. 0 for automatic. */
- abstract int getNumShards();
+ abstract @Nullable ValueProvider<Integer> getNumShards();
/** The shard template of each file written, combined with prefix and
suffix. */
abstract @Nullable String getShardTemplate();
@@ -689,7 +688,8 @@ public class TextIO {
abstract Builder<UserT, DestinationT> setFormatFunction(
@Nullable SerializableFunction<UserT, String> formatFunction);
- abstract Builder<UserT, DestinationT> setNumShards(int numShards);
+ abstract Builder<UserT, DestinationT> setNumShards(
+ @Nullable ValueProvider<Integer> numShards);
abstract Builder<UserT, DestinationT> setWindowedWrites(boolean
windowedWrites);
@@ -846,6 +846,14 @@ public class TextIO {
*/
public TypedWrite<UserT, DestinationT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
+ return withNumShards(StaticValueProvider.of(numShards));
+ }
+
+ /**
+ * Like {@link #withNumShards(int)}. Specifying {@code null} means
runner-determined sharding.
+ */
+ public TypedWrite<UserT, DestinationT> withNumShards(
+ @Nullable ValueProvider<Integer> numShards) {
return toBuilder().setNumShards(numShards).build();
}
@@ -1002,7 +1010,7 @@ public class TextIO {
getHeader(),
getFooter(),
getWritableByteChannelFactory()));
- if (getNumShards() > 0) {
+ if (getNumShards() != null) {
write = write.withNumShards(getNumShards());
}
if (getWindowedWrites()) {
@@ -1020,8 +1028,8 @@ public class TextIO {
resolveDynamicDestinations().populateDisplayData(builder);
builder
- .addIfNotDefault(
- DisplayData.item("numShards", getNumShards()).withLabel("Maximum
Output Shards"), 0)
+ .addIfNotNull(
+ DisplayData.item("numShards", getNumShards()).withLabel("Maximum
Output Shards"))
.addIfNotNull(
DisplayData.item("tempDirectory", getTempDirectory())
.withLabel("Directory for temporary files"))
@@ -1139,6 +1147,11 @@ public class TextIO {
return new Write(inner.withNumShards(numShards));
}
+ /** See {@link TypedWrite#withNumShards(ValueProvider)}. */
+ public Write withNumShards(@Nullable ValueProvider<Integer> numShards) {
+ return new Write(inner.withNumShards(numShards));
+ }
+
/** See {@link TypedWrite#withoutSharding()}. */
public Write withoutSharding() {
return new Write(inner.withoutSharding());
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 9dabb40..7afac9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -884,7 +884,8 @@ public abstract class WriteFiles<UserT, DestinationT,
OutputT>
shardCount = context.sideInput(numShardsView);
} else {
checkNotNull(getNumShardsProvider());
- shardCount = getNumShardsProvider().get();
+ shardCount =
+ checkNotNull(getNumShardsProvider().get(), "Must have non-null
number of shards.");
}
checkArgument(
shardCount > 0,