[ 
https://issues.apache.org/jira/browse/BEAM-11934?focusedWorklogId=565585&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-565585
 ]

ASF GitHub Bot logged work on BEAM-11934:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Mar/21 00:33
            Start Date: 13/Mar/21 00:33
    Worklog Time Spent: 10m 
      Work Description: nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593526147



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), 
input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(
+              "KeyedByShardNum",
+              MapElements.via(
+                  new SimpleFunction<
+                      KV<ShardedKey<Integer>, Iterable<UserT>>, KV<Integer, 
Iterable<UserT>>>() {
+                    @Override
+                    public KV<Integer, Iterable<UserT>> apply(
+                        KV<ShardedKey<Integer>, Iterable<UserT>> input) {
+                      return KV.of(input.getKey().getShardNumber(), 
input.getValue());
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), 
IterableCoder.of(input.getCoder())))
           .apply(
               "WriteShardsIntoTempFiles",
               ParDo.of(new 
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
 
+  private class WriteAutoShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, 
PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteAutoShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> 
fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> 
input) {
+      checkArgument(
+          getWithRunnerDeterminedShardingUnbounded(),
+          "Runner determined sharding for unbounded data is not supported by 
the runner.");
+      // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which 
shards, groups and at
+      // the same time batches the input records. The sharding behavior 
depends on runners. The
+      // batching is per window and we also emit the batches if there are a 
certain number of
+      // records buffered or they have been buffered for a certain time, 
controlled by
+      // FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
+      return input
+          .apply(
+              "KeyedByDestination",
+              ParDo.of(
+                  new DoFn<UserT, KV<Integer, UserT>>() {
+                    @ProcessElement
+                    public void processElement(@Element UserT element, 
ProcessContext context)
+                        throws Exception {
+                      
getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+                      DestinationT destination =
+                          
getDynamicDestinations().getDestination(context.element());
+                      context.output(
+                          KV.of(hashDestination(destination, 
destinationCoder), element));
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+          .apply(
+              "ShardAndGroup",
+              GroupIntoBatches.<Integer, 
UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                  
.withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+                  .withShardedKey())
+          .setCoder(
+              KvCoder.of(
+                  
org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
+                  IterableCoder.of(input.getCoder())))
+          // Add dummy shard since it is required by 
WriteShardsIntoTempFilesFn. It will be dropped

Review comment:
       Not sure. `WriteShardsIntoTempFilesFn` is also used in the 
fixed-sharding case where we do want to ensure that a shard is assigned 
properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 565585)
    Time Spent: 2.5h  (was: 2h 20m)

> GCS streaming file sink uses runner determined sharding
> -------------------------------------------------------
>
>                 Key: BEAM-11934
>                 URL: https://issues.apache.org/jira/browse/BEAM-11934
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files, io-java-gcp
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P1
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Integrate file sink with shardable `GroupIntoBatches` (BEAM-10475) to allow 
> runner determined dynamic sharding for streaming use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to