nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593526147
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
.withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()),
input.getCoder()))
.apply("GroupIntoShards", GroupByKey.create())
+ .apply(
+ "KeyedByShardNum",
+ MapElements.via(
+ new SimpleFunction<
+ KV<ShardedKey<Integer>, Iterable<UserT>>, KV<Integer,
Iterable<UserT>>>() {
+ @Override
+ public KV<Integer, Iterable<UserT>> apply(
+ KV<ShardedKey<Integer>, Iterable<UserT>> input) {
+ return KV.of(input.getKey().getShardNumber(),
input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(VarIntCoder.of(),
IterableCoder.of(input.getCoder())))
.apply(
"WriteShardsIntoTempFiles",
ParDo.of(new
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder);
}
}
+ private class WriteAutoShardedBundlesToTempFiles
+ extends PTransform<PCollection<UserT>,
PCollection<FileResult<DestinationT>>> {
+ private final Coder<DestinationT> destinationCoder;
+ private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+ private WriteAutoShardedBundlesToTempFiles(
+ Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>>
fileResultCoder) {
+ this.destinationCoder = destinationCoder;
+ this.fileResultCoder = fileResultCoder;
+ }
+
+ @Override
+ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT>
input) {
+ checkArgument(
+ getWithRunnerDeterminedShardingUnbounded(),
+ "Runner determined sharding for unbounded data is not supported by
the runner.");
+ // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which
shards, groups and at
+ // the same time batches the input records. The sharding behavior
depends on runners. The
+ // batching is per window and we also emit the batches if there are a
certain number of
+ // records buffered or they have been buffered for a certain time,
controlled by
+ // FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
+ return input
+ .apply(
+ "KeyedByDestination",
+ ParDo.of(
+ new DoFn<UserT, KV<Integer, UserT>>() {
+ @ProcessElement
+ public void processElement(@Element UserT element,
ProcessContext context)
+ throws Exception {
+
getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+ DestinationT destination =
+
getDynamicDestinations().getDestination(context.element());
+ context.output(
+ KV.of(hashDestination(destination,
destinationCoder), element));
+ }
+ }))
+ .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+ .apply(
+ "ShardAndGroup",
+ GroupIntoBatches.<Integer,
UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+
.withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+ .withShardedKey())
+ .setCoder(
+ KvCoder.of(
+
org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
+ IterableCoder.of(input.getCoder())))
+ // Add dummy shard since it is required by
WriteShardsIntoTempFilesFn. It will be dropped
Review comment:
Not sure. `WriteShardsIntoTempFilesFn` is also used in the
fixed-sharding case where we do want to ensure that a shard is assigned
properly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]