Repository: beam Updated Branches: refs/heads/master af8f586b6 -> 018513e58
Remove Direct Runner "doneness" configuration Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fa9a280 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fa9a280 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fa9a280 Branch: refs/heads/master Commit: 4fa9a2807f552ec939b1fb445f555ae420ac0613 Parents: af8f586 Author: Thomas Groh <[email protected]> Authored: Tue Mar 28 09:46:02 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Apr 3 11:41:18 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectOptions.java | 11 ---- .../beam/runners/direct/DirectRunner.java | 6 --- .../beam/runners/direct/EvaluationContext.java | 34 ++---------- .../runners/direct/EvaluationContextTest.java | 57 +------------------- 4 files changed, 4 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java index 32ef352..3b66cc6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java @@ -29,17 +29,6 @@ import org.apache.beam.sdk.options.PipelineOptions; public interface DirectOptions extends PipelineOptions, ApplicationNameOptions { @Default.Boolean(true) @Description( - "If the pipeline should shut down producers which have reached the maximum " - + "representable watermark. If this is set to true, a pipeline in which all PTransforms " - + "have reached the maximum watermark will be shut down, even if there are unbounded " - + "sources that could produce additional (late) data. By default, if the pipeline " - + "contains any unbounded PCollections, it will run until explicitly shut down.") - boolean isShutdownUnboundedProducersWithMaxWatermark(); - - void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown); - - @Default.Boolean(true) - @Description( "If the pipeline should block awaiting completion of the pipeline. If set to true, " + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, " + "the Pipeline will execute asynchronously. If set to false, use " http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index bd210c3..43147a0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; import org.joda.time.Instant; @@ -421,11 +420,6 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { * exception. Future calls to {@link #getState()} will return * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}. * - * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded} - * {@link PCollection}, and the {@link PipelineRunner} was created with - * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false, - * this method will never return. - * * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}. */ @Override http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 49c9ec2..54ce027 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -50,10 +50,8 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Instant; /** @@ -402,37 +400,11 @@ class EvaluationContext { /** * Returns true if the step will not produce additional output. - * - * <p>If the provided transform produces only {@link IsBounded#BOUNDED} - * {@link PCollection PCollections}, returns true if the watermark is at - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. - * - * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED} - * {@link PCollection PCollections}, returns the value of - * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}. */ public boolean isDone(AppliedPTransform<?, ?, ?> transform) { - // if the PTransform's watermark isn't at the max value, it isn't done - if (watermarkManager - .getWatermarks(transform) - .getOutputWatermark() - .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - return false; - } - // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, - // the PTransform may produce additional output. It is not done. - for (TaggedPValue output : transform.getOutputs()) { - if (output.getValue() instanceof PCollection) { - IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded(); - if (bounded.equals(IsBounded.UNBOUNDED) - && !options.isShutdownUnboundedProducersWithMaxWatermark()) { - return false; - } - } - } - // The PTransform's watermark was at positive infinity and all of its outputs are known to be - // done. It is done. - return true; + // the PTransform is done only if watermark is at the max value + Instant stepWatermark = watermarkManager.getWatermarks(transform).getOutputWatermark(); + return !stepWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index d6f2263..7a65493 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -414,8 +414,7 @@ public class EvaluationContextTest { } @Test - public void isDoneWithUnboundedPCollectionAndShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + public void isDoneWithUnboundedPCollection() { assertThat(context.isDone(unboundedProducer), is(false)); context.handleResult( @@ -427,33 +426,7 @@ public class EvaluationContextTest { } @Test - public void isDoneWithUnboundedPCollectionAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(graph.getProducer(unbounded)), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(graph.getProducer(unbounded)).build()); - assertThat(context.isDone(graph.getProducer(unbounded)), is(false)); - } - - @Test - public void isDoneWithOnlyBoundedPCollections() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(createdProducer), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(createdProducer).build()); - context.extractFiredTimers(); - assertThat(context.isDone(createdProducer), is(true)); - } - - @Test public void isDoneWithPartiallyDone() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); assertThat(context.isDone(), is(false)); UncommittedBundle<Integer> rootBundle = context.createBundle(created); @@ -484,34 +457,6 @@ public class EvaluationContextTest { assertThat(context.isDone(), is(true)); } - @Test - public void isDoneWithUnboundedAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(createdProducer).build()); - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unboundedProducer).build()); - context.handleResult( - context.createBundle(created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(downstreamProducer).build()); - context.extractFiredTimers(); - assertThat(context.isDone(), is(false)); - - context.handleResult( - context.createBundle(created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(viewProducer).build()); - context.extractFiredTimers(); - assertThat(context.isDone(), is(false)); - } - private static class TestBoundedWindow extends BoundedWindow { private final Instant ts;
