reuvenlax commented on a change in pull request #15731:
URL: https://github.com/apache/beam/pull/15731#discussion_r737653595



##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -152,14 +180,84 @@ public void process(ProcessContext c) {
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, 
Iterable<V>>>> {
 
     private final BatchingParams<V> batchingParams;
+    private final transient DataflowRunner runner;
+    private final transient PCollection<KV<ShardedKey<K>, Iterable<V>>> 
originalOutput;
 
-    private BatchGroupIntoBatchesWithShardedKey(BatchingParams<V> 
batchingParams) {
+    private BatchGroupIntoBatchesWithShardedKey(
+        BatchingParams<V> batchingParams,
+        DataflowRunner runner,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> originalOutput) {
       this.batchingParams = batchingParams;
+      this.runner = runner;
+      this.originalOutput = originalOutput;
     }
 
     @Override
     public PCollection<KV<ShardedKey<K>, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
-      return shardKeys(input).apply(new 
BatchGroupIntoBatches<>(batchingParams));
+      return shardKeys(input)
+          .apply(new BatchGroupIntoBatches<>(batchingParams, runner, 
originalOutput));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>> {
+
+    private final DataflowRunner runner;
+
+    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupIntoBatches<K, V>>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new StreamingGroupIntoBatches<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, 
Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches} for unbounded 
Dataflow pipelines. The
+   * override does the same thing as the original transform but additionally 
records the output in
+   * order to append required step properties during the graph translation.
+   */
+  static class StreamingGroupIntoBatches<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+
+    private final transient DataflowRunner runner;

Review comment:
       No, this should be fine. It's transient, so it won't be populated on the 
workers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to