Repository: beam
Updated Branches:
  refs/heads/master af8f586b6 -> 018513e58


Remove Direct Runner "doneness" configuration


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fa9a280
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fa9a280
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fa9a280

Branch: refs/heads/master
Commit: 4fa9a2807f552ec939b1fb445f555ae420ac0613
Parents: af8f586
Author: Thomas Groh <[email protected]>
Authored: Tue Mar 28 09:46:02 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Mon Apr 3 11:41:18 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectOptions.java      | 11 ----
 .../beam/runners/direct/DirectRunner.java       |  6 ---
 .../beam/runners/direct/EvaluationContext.java  | 34 ++----------
 .../runners/direct/EvaluationContextTest.java   | 57 +-------------------
 4 files changed, 4 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 32ef352..3b66cc6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -29,17 +29,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 public interface DirectOptions extends PipelineOptions, ApplicationNameOptions 
{
   @Default.Boolean(true)
   @Description(
-      "If the pipeline should shut down producers which have reached the 
maximum "
-          + "representable watermark. If this is set to true, a pipeline in 
which all PTransforms "
-          + "have reached the maximum watermark will be shut down, even if 
there are unbounded "
-          + "sources that could produce additional (late) data. By default, if 
the pipeline "
-          + "contains any unbounded PCollections, it will run until explicitly 
shut down.")
-  boolean isShutdownUnboundedProducersWithMaxWatermark();
-
-  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
-
-  @Default.Boolean(true)
-  @Description(
       "If the pipeline should block awaiting completion of the pipeline. If 
set to true, "
           + "a call to Pipeline#run() will block until all PTransforms are 
complete. Otherwise, "
           + "the Pipeline will execute asynchronously. If set to false, use "

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index bd210c3..43147a0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -57,7 +57,6 @@ import 
org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -421,11 +420,6 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
      * exception. Future calls to {@link #getState()} will return
      * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
      *
-     * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED 
unbounded}
-     * {@link PCollection}, and the {@link PipelineRunner} was created with
-     * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} 
set to false,
-     * this method will never return.
-     *
      * <p>See also {@link PipelineExecutor#waitUntilFinish(Duration)}.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 49c9ec2..54ce027 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -50,10 +50,8 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
 
 /**
@@ -402,37 +400,11 @@ class EvaluationContext {
 
   /**
    * Returns true if the step will not produce additional output.
-   *
-   * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
-   * {@link PCollection PCollections}, returns true if the watermark is at
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
-   *
-   * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
-   * {@link PCollection PCollections}, returns the value of
-   * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
    */
   public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
-    // if the PTransform's watermark isn't at the max value, it isn't done
-    if (watermarkManager
-        .getWatermarks(transform)
-        .getOutputWatermark()
-        .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-      return false;
-    }
-    // If the PTransform has any unbounded outputs, and unbounded producers 
should not be shut down,
-    // the PTransform may produce additional output. It is not done.
-    for (TaggedPValue output : transform.getOutputs()) {
-      if (output.getValue() instanceof PCollection) {
-        IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded();
-        if (bounded.equals(IsBounded.UNBOUNDED)
-            && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
-          return false;
-        }
-      }
-    }
-    // The PTransform's watermark was at positive infinity and all of its 
outputs are known to be
-    // done. It is done.
-    return true;
+    // the PTransform is done only if watermark is at the max value
+    Instant stepWatermark = 
watermarkManager.getWatermarks(transform).getOutputWatermark();
+    return !stepWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4fa9a280/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index d6f2263..7a65493 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -414,8 +414,7 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void isDoneWithUnboundedPCollectionAndShutdown() {
-    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+  public void isDoneWithUnboundedPCollection() {
     assertThat(context.isDone(unboundedProducer), is(false));
 
     context.handleResult(
@@ -427,33 +426,7 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void isDoneWithUnboundedPCollectionAndNotShutdown() {
-    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(graph.getProducer(unbounded)), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(graph.getProducer(unbounded)).build());
-    assertThat(context.isDone(graph.getProducer(unbounded)), is(false));
-  }
-
-  @Test
-  public void isDoneWithOnlyBoundedPCollections() {
-    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(createdProducer), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(createdProducer).build());
-    context.extractFiredTimers();
-    assertThat(context.isDone(createdProducer), is(true));
-  }
-
-  @Test
   public void isDoneWithPartiallyDone() {
-    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
     assertThat(context.isDone(), is(false));
 
     UncommittedBundle<Integer> rootBundle = context.createBundle(created);
@@ -484,34 +457,6 @@ public class EvaluationContextTest {
     assertThat(context.isDone(), is(true));
   }
 
-  @Test
-  public void isDoneWithUnboundedAndNotShutdown() {
-    
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(), is(false));
-
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(createdProducer).build());
-    context.handleResult(
-        null,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unboundedProducer).build());
-    context.handleResult(
-        context.createBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(downstreamProducer).build());
-    context.extractFiredTimers();
-    assertThat(context.isDone(), is(false));
-
-    context.handleResult(
-        context.createBundle(created).commit(Instant.now()),
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(viewProducer).build());
-    context.extractFiredTimers();
-    assertThat(context.isDone(), is(false));
-  }
-
   private static class TestBoundedWindow extends BoundedWindow {
     private final Instant ts;
 

Reply via email to