[
https://issues.apache.org/jira/browse/BEAM-9748?focusedWorklogId=435611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435611
]
ASF GitHub Bot logged work on BEAM-9748:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/May/20 18:59
Start Date: 20/May/20 18:59
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #11406:
URL: https://github.com/apache/beam/pull/11406#discussion_r428239285
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
/** Implementation of {@link #viaRandomKey()}. */
public static class ViaRandomKey<T> extends PTransform<PCollection<T>,
PCollection<T>> {
+ private boolean isHighFanoutAndLimitedInputParallelism;
+
private ViaRandomKey() {}
+ /**
+ * Use a different strategy that materializes the input and prepares it to
be consumed in a
+ * highly parallel fashion.
+ *
+ * <p>It is tailored to the case when input was produced in an extremely
sequential way -
+ * typically by a ParDo that emits millions of outputs _per input
element_, e.g., executing a
+ * large database query or a large simulation and emitting all of their
results.
+ *
+ * <p>Internally, it materializes the input at a moderate cost before
reshuffling it, making the
+ * reshuffling itself significantly cheaper in these extreme cases on some
runners. Use this
+ * only if your benchmarks show an improvement.
+ */
+ public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+ this.isHighFanoutAndLimitedInputParallelism = true;
Review comment:
Lets return a new PTransform like our other PTransform builder patterns
instead of mutating the state of the current transform.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
/** Implementation of {@link #viaRandomKey()}. */
public static class ViaRandomKey<T> extends PTransform<PCollection<T>,
PCollection<T>> {
+ private boolean isHighFanoutAndLimitedInputParallelism;
+
private ViaRandomKey() {}
+ /**
+ * Use a different strategy that materializes the input and prepares it to
be consumed in a
+ * highly parallel fashion.
+ *
+ * <p>It is tailored to the case when input was produced in an extremely
sequential way -
+ * typically by a ParDo that emits millions of outputs _per input
element_, e.g., executing a
+ * large database query or a large simulation and emitting all of their
results.
+ *
+ * <p>Internally, it materializes the input at a moderate cost before
reshuffling it, making the
+ * reshuffling itself significantly cheaper in these extreme cases on some
runners. Use this
+ * only if your benchmarks show an improvement.
+ */
+ public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+ this.isHighFanoutAndLimitedInputParallelism = true;
+ return this;
+ }
+
@Override
public PCollection<T> expand(PCollection<T> input) {
+ if (isHighFanoutAndLimitedInputParallelism) {
+ // See https://issues.apache.org/jira/browse/BEAM-2803
+ // We use a combined approach to "break fusion" here:
+ // (see
https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
+ // 1) force the data to be materialized by passing it as a side input
to an identity fn,
+ // then 2) reshuffle it with a random key. Initial materialization
provides some parallelism
+ // and ensures that data to be shuffled can be generated in parallel,
while reshuffling
+ // provides perfect parallelism.
+ // In most cases where a "fusion break" is needed, a simple reshuffle
would be sufficient.
+ // The current approach is necessary only to support the particular
case of JdbcIO where
Review comment:
```suggestion
// The current approach is necessary to support use cases such as
JdbcIO where
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 435611)
Time Spent: 4.5h (was: 4h 20m)
> Refactor Reparallelize as an alternative Reshuffle implementation
> -----------------------------------------------------------------
>
> Key: BEAM-9748
> URL: https://issues.apache.org/jira/browse/BEAM-9748
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: P3
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> Some DoFn based IOs like JdbcIO and RedisIO rely on a different approach to
> Reparallelize outputs using a combination of a an empty PCollectionView to
> force materialization and Reshuffle.viaRandomkey to reparallelize a
> PCollection. This issue extracts this transform and expose it as part of the
> Reshuffle to avoid repeating the code for transforms (notably IOs) that
> produce lots of sequentially generated data where and benefit of this
> alternative approach to perform better reparallelization of its output.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)