jkff commented on a change in pull request #11406:
URL: https://github.com/apache/beam/pull/11406#discussion_r428356257



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;
+
     private ViaRandomKey() {}
 
+    /**
+     * Use a different strategy that materializes the input and prepares it to 
be consumed in a
+     * highly parallel fashion.
+     *
+     * <p>It is tailored to the case when input was produced in an extremely 
sequential way -
+     * typically by a ParDo that emits millions of outputs _per input 
element_, e.g., executing a
+     * large database query or a large simulation and emitting all of their 
results.
+     *
+     * <p>Internally, it materializes the input at a moderate cost before 
reshuffling it, making the
+     * reshuffling itself significantly cheaper in these extreme cases on some 
runners. Use this
+     * only if your benchmarks show an improvement.
+     */
+    public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+      this.isHighFanoutAndLimitedInputParallelism = true;
+      return this;
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
+      if (isHighFanoutAndLimitedInputParallelism) {
+        // See https://issues.apache.org/jira/browse/BEAM-2803
+        // We use a combined approach to "break fusion" here:
+        // (see 
https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
+        // 1) force the data to be materialized by passing it as a side input 
to an identity fn,
+        // then 2) reshuffle it with a random key. Initial materialization 
provides some parallelism
+        // and ensures that data to be shuffled can be generated in parallel, 
while reshuffling
+        // provides perfect parallelism.
+        // In most cases where a "fusion break" is needed, a simple reshuffle 
would be sufficient.
+        // The current approach is necessary only to support the particular 
case of JdbcIO where
+        // a single query may produce many gigabytes of query results.
+        PCollectionView<Iterable<T>> empty =
+            input
+                .apply("Consume", 
Filter.by(SerializableFunctions.constant(false)))
+                .apply(View.asIterable());
+        PCollection<T> materialized =
+            input.apply(
+                "Identity",
+                ParDo.of(

Review comment:
       nit: here you could use a `MapElements.via(Contextful.of(t -> t, 
requiresSideInputs(empty)))`

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;

Review comment:
       Please use an immutable AutoValue with a builder here. PTransforms can't 
have mutable member variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to