[
https://issues.apache.org/jira/browse/BEAM-4242?focusedWorklogId=100270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100270
]
ASF GitHub Bot logged work on BEAM-4242:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/May/18 21:49
Start Date: 09/May/18 21:49
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5296: [BEAM-4242] Fixes O(n)
complexity of Wait.on() due to combiner lifting
URL: https://github.com/apache/beam/pull/5296
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 14e55c35b33..9d7068f6058 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -844,13 +844,15 @@ public void translate(GroupByKey transform,
TranslationContext context) {
WindowingStrategy<?, ?> windowingStrategy =
input.getWindowingStrategy();
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
- boolean disallowCombinerLifting =
- !windowingStrategy.getWindowFn().isNonMerging()
- || !windowingStrategy.getWindowFn().assignsToOneWindow()
- || (isStreaming && !transform.fewKeys())
- // TODO: Allow combiner lifting on the non-default
trigger, as appropriate.
- || !(windowingStrategy.getTrigger() instanceof
DefaultTrigger);
- stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING,
disallowCombinerLifting);
+ boolean allowCombinerLifting =
+ windowingStrategy.getWindowFn().isNonMerging()
+ && windowingStrategy.getWindowFn().assignsToOneWindow();
+ if (isStreaming) {
+ allowCombinerLifting &= transform.fewKeys();
+ // TODO: Allow combiner lifting on the non-default trigger, as
appropriate.
+ allowCombinerLifting &= (windowingStrategy.getTrigger()
instanceof DefaultTrigger);
+ }
+ stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING,
!allowCombinerLifting);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeWindowingStrategy(windowingStrategy)));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index eb593f5bc46..208b976ebee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -24,6 +24,7 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -114,7 +115,7 @@
* <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
the
* {@link FinishBundle} method.
*/
- public abstract void output(OutputT output, Instant timestamp,
BoundedWindow window);
+ public abstract void output(@Nullable OutputT output, Instant timestamp,
BoundedWindow window);
/**
* Adds the given element to the output {@code PCollection} with the given
tag at the given
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
index 0e06ec20d11..de8219a979e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
@@ -21,9 +21,13 @@
import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
@@ -113,8 +117,33 @@ private OnSignal(List<PCollection<?>> signals) {
private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT>
input) {
return input
.apply(Window.<SignalT>configure().triggering(Never.ever()).discardingFiredPanes())
+ // Perform a per-window pre-combine so that our performance does not
critically depend
+ // on combiner lifting.
+ .apply(ParDo.of(new CollectWindowsFn<>()))
.apply(Sample.any(1))
.apply(View.asList());
}
}
+
+ private static class CollectWindowsFn<T> extends DoFn<T, Void> {
+ @Nullable
+ private Set<BoundedWindow> windows;
+
+ @StartBundle
+ public void startBundle() {
+ windows = Sets.newHashSetWithExpectedSize(1);
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c, BoundedWindow w) {
+ windows.add(w);
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) {
+ for (BoundedWindow w : windows) {
+ c.output(null, w.maxTimestamp(), w);
+ }
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 100270)
Time Spent: 1h 10m (was: 1h)
> Wait.on() is O(n)
> -----------------
>
> Key: BEAM-4242
> URL: https://issues.apache.org/jira/browse/BEAM-4242
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Reporter: Eugene Kirpichov
> Assignee: Eugene Kirpichov
> Priority: Major
> Fix For: 2.5.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Wait.on() uses a NeverTrigger and a Sample.any(1) as an implementation detail.
> Unfortunately, Sample.any() relies on combiner lifting for performance -
> otherwise all values end up grouped onto the same worker which is not
> acceptable if the signal PCollection is large.
> Not all runners support combiner lifting at all; and even those that do (e.g.
> Dataflow) don't guarantee it. In the case of a very large user's pipeline,
> combiner lifting was not performed because it's only supported for
> DefaultTrigger, but not for NeverTrigger.
> This should be fixed by modifying Wait to not rely on combiner lifting for
> performance, e.g. by a "manual" precombine (emit 1 value per bundle).
> CC: [~mkhadikov]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)