nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r516839644



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -103,43 +156,76 @@ public void process(ProcessContext c) {
     }
 
     @Override
-    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>>
+    public PTransformReplacement<PCollection<KV<K, V>>, 
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>>
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StreamingGroupIntoBatches(runner, transform.getTransform()));
+          new StreamingGroupIntoBatchesWithShardedKey<>(runner, 
transform.getTransform()));
     }
 
     @Override
     public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, 
Iterable<V>>> newOutput) {
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally 
record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for 
unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but 
additionally records
+   * the input of {@code GroupIntoBatchesDoFn} in order to append relevant 
step properties during
+   * the graph translation.
    */
-  static class StreamingGroupIntoBatches<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
 
     private final transient DataflowRunner runner;
-    private final GroupIntoBatches<K, V> original;
+    private final GroupIntoBatches<K, V>.WithShardedKey original;
 
-    public StreamingGroupIntoBatches(DataflowRunner runner, 
GroupIntoBatches<K, V> original) {
+    public StreamingGroupIntoBatchesWithShardedKey(
+        DataflowRunner runner, GroupIntoBatches<K, V>.WithShardedKey original) 
{
       this.runner = runner;
       this.original = original;
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) 
{
-      runner.maybeRecordPCollectionWithAutoSharding(input);
-      return input.apply(original);
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
+      PCollection<KV<ShardedKey<K>, V>> intermediate_input = ShardKeys(input);
+
+      runner.maybeRecordPCollectionWithAutoSharding(intermediate_input);
+
+      if (original.getMaxBufferingDuration() != null) {

Review comment:
       Tried to make the changes but seems like making the 
`GroupIntoBatchesDoFn` visible to runners is not enough, since now we need to 
differentiate the DoFn in `GroupIntoBatches` transform and in 
`GroupIntoBatches.withShardedKey` transform. `DataflowPipelineTranslator` seems 
to only recognize primitive transforms, e.g., `ParDoSingle` and 
`ParDo.MultiOutput`.
   
   
https://github.com/apache/beam/blob/f67cb9ad6a46fb5b6af9aababc25305a8d5968c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java#L997
   
   Possible workarounds:
   - Match the DoFn name (withShardedKey) or PCollection type name (ShardedKey).
   - Record the output pcollection of GIB transform and use that to recognize 
GIB DoFn. Not 100% sure if that will work but I can try.
   - Or...?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -210,20 +211,23 @@ public StreamingGroupIntoBatchesWithShardedKey(
     }
   }
 
-  private static <K, V> PCollection<KV<ShardedKey<K>, V>> 
ShardKeys(PCollection<KV<K, V>> input) {
+  private static <K, V> PCollection<KV<ShardedKey<K>, V>> 
shardKeys(PCollection<KV<K, V>> input) {
     KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
     org.apache.beam.sdk.coders.Coder<K> keyCoder =
         (org.apache.beam.sdk.coders.Coder<K>) 
inputCoder.getCoderArguments().get(0);
     org.apache.beam.sdk.coders.Coder<V> valueCoder =
         (org.apache.beam.sdk.coders.Coder<V>) 
inputCoder.getCoderArguments().get(1);
     return input
         .apply(
-            "ShardKeys",
+            "Shard Keys",
             MapElements.via(
                 new SimpleFunction<KV<K, V>, KV<ShardedKey<K>, V>>() {
                   @Override
                   public KV<ShardedKey<K>, V> apply(KV<K, V> input) {
-                    return KV.of(ShardedKey.of(input.getKey(), new byte[0]), 
input.getValue());
+                    long tid = Thread.currentThread().getId();

Review comment:
       Hmm I am a bit confused. The statically initialized random is at 
`GroupIntoBatches` class level. Is that correct? Would it be initialized only 
once the transform is created in the user pipeline and shared across workers? 
In the unit test, I found that the tid was different almost every time 
`SimpleFunction.apply` was called while the static tid was the same all the 
time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to