Repository: incubator-beam
Updated Branches:
  refs/heads/master 8390a2212 -> 1283308e2


Remove BlockingDataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c0fab0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c0fab0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c0fab0b

Branch: refs/heads/master
Commit: 4c0fab0b3a36d184c6d1fe060d60dd9b6678daf1
Parents: 8390a22
Author: Pei He <pe...@google.com>
Authored: Fri Jul 29 15:40:07 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Tue Nov 1 12:52:09 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      |   3 +-
 .../dataflow/BlockingDataflowRunner.java        | 170 -----------
 .../dataflow/DataflowJobCancelledException.java |  39 ---
 .../dataflow/DataflowJobExecutionException.java |  35 ---
 .../dataflow/DataflowJobUpdatedException.java   |  51 ----
 .../runners/dataflow/DataflowPipelineJob.java   |  43 ++-
 .../dataflow/DataflowPipelineRegistrar.java     |  11 +-
 .../BlockingDataflowPipelineOptions.java        |  28 --
 .../testing/TestDataflowPipelineOptions.java    |   6 +-
 .../dataflow/testing/TestDataflowRunner.java    |   7 +-
 .../dataflow/BlockingDataflowRunnerTest.java    | 300 -------------------
 .../dataflow/DataflowPipelineJobTest.java       |  30 +-
 .../dataflow/DataflowPipelineRegistrarTest.java |   7 +-
 .../apache/beam/sdk/transforms/Aggregator.java  |   9 +-
 14 files changed, 80 insertions(+), 659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java 
b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 1209a67..6962571 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -272,8 +272,7 @@ public class ExampleUtils {
   }
 
   /**
-   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
-   * waits for the pipeline to finish and cancels it (and the injector) before 
the program exists.
+   * Waits for the pipeline to finish and cancels it before the program exists.
    */
   public void waitToFinish(PipelineResult result) {
     pipelinesToCancel.add(result);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
deleted file mode 100644
index 5285ade..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import javax.annotation.Nullable;
-import 
org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link PipelineRunner} that's like {@link DataflowRunner}
- * but that waits for the launched job to finish.
- *
- * <p>Logs job status updates and console messages while it waits.
- *
- * <p>Returns the final job state, or throws an exception if the job
- * fails or cannot be monitored.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code BlockingDataflowRunner}, the Google cloud services account and the 
Google compute
- * engine service account of the GCP project running the Dataflow Job will 
need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a 
href="https://cloud.google.com/dataflow/security-and-permissions";>Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-public class BlockingDataflowRunner extends
-    PipelineRunner<DataflowPipelineJob> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BlockingDataflowRunner.class);
-
-  // Defaults to an infinite wait period.
-  // TODO: make this configurable after removal of option map.
-  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
-
-  private final DataflowRunner dataflowRunner;
-  private final BlockingDataflowPipelineOptions options;
-
-  protected BlockingDataflowRunner(
-      DataflowRunner internalRunner,
-      BlockingDataflowPipelineOptions options) {
-    this.dataflowRunner = internalRunner;
-    this.options = options;
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static BlockingDataflowRunner fromOptions(
-      PipelineOptions options) {
-    BlockingDataflowPipelineOptions dataflowOptions =
-        
PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, 
options);
-    DataflowRunner dataflowRunner =
-        DataflowRunner.fromOptions(dataflowOptions);
-
-    return new BlockingDataflowRunner(dataflowRunner, dataflowOptions);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws DataflowJobExecutionException if there is an exception during job 
execution.
-   * @throws DataflowServiceException if there is an exception retrieving 
information about the job.
-   */
-  @Override
-  public DataflowPipelineJob run(Pipeline p) {
-    final DataflowPipelineJob job = dataflowRunner.run(p);
-
-    // We ignore the potential race condition here (Ctrl-C after job 
submission but before the
-    // shutdown hook is registered). Even if we tried to do something smarter 
(eg., SettableFuture)
-    // the run method (which produces the job) could fail or be Ctrl-C'd 
before it had returned a
-    // job. The display of the command to cancel the job is best-effort 
anyways -- RPC's could fail,
-    // etc. If the user wants to verify the job was cancelled they should look 
at the job status.
-    Thread shutdownHook = new Thread() {
-      @Override
-      public void run() {
-        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will 
not cancel it.\n"
-            + "To cancel the job in the cloud, run:\n> {}",
-            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
-      }
-    };
-
-    try {
-      Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-      @Nullable
-      State result = 
job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC));
-
-      if (result == null) {
-        throw new DataflowServiceException(
-            job, "Timed out while retrieving status for job " + 
job.getJobId());
-      }
-
-      LOG.info("Job finished with status {}", result);
-      if (!result.isTerminal()) {
-        throw new IllegalStateException("Expected terminal state for job " + 
job.getJobId()
-            + ", got " + result);
-      }
-
-      if (result == State.DONE) {
-        return job;
-      } else if (result == State.UPDATED) {
-        DataflowPipelineJob newJob = job.getReplacedByJob();
-        LOG.info("Job {} has been updated and is running as the new job with 
id {}."
-            + "To access the updated job on the Dataflow monitoring console, 
please navigate to {}",
-            job.getJobId(),
-            newJob.getJobId(),
-            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), 
newJob.getJobId()));
-        throw new DataflowJobUpdatedException(
-            job,
-            String.format("Job %s updated; new job is %s.", job.getJobId(), 
newJob.getJobId()),
-            newJob);
-      } else if (result == State.CANCELLED) {
-        String message = String.format("Job %s cancelled by user", 
job.getJobId());
-        LOG.info(message);
-        throw new DataflowJobCancelledException(job, message);
-      } else {
-        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
-            + " failed with status " + result);
-      }
-    } finally {
-      Runtime.getRuntime().removeShutdownHook(shutdownHook);
-    }
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    return dataflowRunner.apply(transform, input);
-  }
-
-  /**
-   * Sets callbacks to invoke during execution. See {@link 
DataflowRunnerHooks}.
-   */
-  @Experimental
-  public void setHooks(DataflowRunnerHooks hooks) {
-    this.dataflowRunner.setHooks(hooks);
-  }
-
-  @Override
-  public String toString() {
-    return "BlockingDataflowRunner#" + options.getJobName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
deleted file mode 100644
index e2edb6a..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowRunner} was updated 
during execution.
- */
-public class DataflowJobCancelledException extends DataflowJobException {
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the 
specified {@link
-   * DataflowPipelineJob} and message.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String 
message) {
-    super(job, message, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobAlreadyUpdatedException} with the 
specified {@link
-   * DataflowPipelineJob}, message, and cause.
-   */
-  public DataflowJobCancelledException(DataflowPipelineJob job, String 
message, Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
deleted file mode 100644
index ccf8057..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowRunner} fails during 
execution, and
- * provides access to the failed job.
- */
-public class DataflowJobExecutionException extends DataflowJobException {
-  DataflowJobExecutionException(DataflowPipelineJob job, String message) {
-    this(job, message, null);
-  }
-
-  DataflowJobExecutionException(
-      DataflowPipelineJob job, String message, @Nullable Throwable cause) {
-    super(job, message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
deleted file mode 100644
index 39d1d47..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-/**
- * Signals that a job run by a {@link BlockingDataflowRunner} was updated 
during execution.
- */
-public class DataflowJobUpdatedException extends DataflowJobException {
-  private DataflowPipelineJob replacedByJob;
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified 
original {@link
-   * DataflowPipelineJob}, message, and replacement {@link 
DataflowPipelineJob}.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob 
replacedByJob) {
-    this(job, message, replacedByJob, null);
-  }
-
-  /**
-   * Create a new {@code DataflowJobUpdatedException} with the specified 
original {@link
-   * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, 
and cause.
-   */
-  public DataflowJobUpdatedException(
-      DataflowPipelineJob job, String message, DataflowPipelineJob 
replacedByJob, Throwable cause) {
-    super(job, message, cause);
-    this.replacedByJob = replacedByJob;
-  }
-
-  /**
-   * The new job that replaces the job terminated with this exception.
-   */
-  public DataflowPipelineJob getReplacedByJob() {
-    return replacedByJob;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index c3be192..27006a4 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -206,7 +206,26 @@ public class DataflowPipelineJob implements PipelineResult 
{
   public State waitUntilFinish(
       Duration duration,
       MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, 
InterruptedException {
-    return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, 
NanoClock.SYSTEM);
+    // We ignore the potential race condition here (Ctrl-C after job 
submission but before the
+    // shutdown hook is registered). Even if we tried to do something smarter 
(eg., SettableFuture)
+    // the run method (which produces the job) could fail or be Ctrl-C'd 
before it had returned a
+    // job. The display of the command to cancel the job is best-effort 
anyways -- RPC's could fail,
+    // etc. If the user wants to verify the job was cancelled they should look 
at the job status.
+    Thread shutdownHook = new Thread() {
+      @Override
+      public void run() {
+        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will 
not cancel it.\n"
+            + "To cancel the job in the cloud, run:\n> {}",
+            MonitoringUtil.getGcloudCancelCommand(dataflowOptions, 
getJobId()));
+      }
+    };
+
+    try {
+      Runtime.getRuntime().addShutdownHook(shutdownHook);
+      return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, 
NanoClock.SYSTEM);
+    } finally {
+      Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    }
   }
 
   /**
@@ -230,8 +249,7 @@ public class DataflowPipelineJob implements PipelineResult {
       Duration duration,
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
-      NanoClock nanoClock)
-          throws IOException, InterruptedException {
+      NanoClock nanoClock) throws IOException, InterruptedException {
     MonitoringUtil monitor = new MonitoringUtil(projectId, 
dataflowOptions.getDataflowClient());
 
     long lastTimestamp = 0;
@@ -275,6 +293,23 @@ public class DataflowPipelineJob implements PipelineResult 
{
       if (!hasError) {
         // We can stop if the job is done.
         if (state.isTerminal()) {
+          switch (state) {
+            case DONE:
+            case CANCELLED:
+              LOG.info("Job {} finished with status {}.", getJobId(), state);
+              break;
+            case UPDATED:
+              LOG.info("Job {} has been updated and is running as the new job 
with id {}. "
+                  + "To access the updated job on the Dataflow monitoring 
console, "
+                  + "please navigate to {}",
+                  getJobId(),
+                  getReplacedByJob().getJobId(),
+                  MonitoringUtil.getJobMonitoringPageURL(
+                      getReplacedByJob().getProjectId(), 
getReplacedByJob().getJobId()));
+              break;
+            default:
+              LOG.info("Job {} failed with status {}.", getJobId(), state);
+          }
           return state;
         }
 
@@ -297,7 +332,7 @@ public class DataflowPipelineJob implements PipelineResult {
         }
       }
     } while(BackOffUtils.next(sleeper, backoff));
-    LOG.warn("No terminal state was returned.  State value {}", state);
+    LOG.warn("No terminal state was returned. State value {}", state);
     return null;  // Timed out.
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index 5090a8a..5bd3bcd 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow;
 
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
-import 
org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
@@ -34,28 +33,26 @@ public class DataflowPipelineRegistrar {
   private DataflowPipelineRegistrar() { }
 
   /**
-   * Register the {@link DataflowPipelineOptions} and {@link 
BlockingDataflowPipelineOptions}.
+   * Register the {@link DataflowPipelineOptions}.
    */
   @AutoService(PipelineOptionsRegistrar.class)
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
       return ImmutableList.<Class<? extends PipelineOptions>>of(
-          DataflowPipelineOptions.class,
-          BlockingDataflowPipelineOptions.class);
+          DataflowPipelineOptions.class);
     }
   }
 
   /**
-   * Register the {@link DataflowRunner} and {@link BlockingDataflowRunner}.
+   * Register the {@link DataflowRunner}.
    */
   @AutoService(PipelineRunnerRegistrar.class)
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
       return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          DataflowRunner.class,
-          BlockingDataflowRunner.class);
+          DataflowRunner.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
deleted file mode 100644
index 5d8d1a1..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.options;
-
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.sdk.options.Description;
-
-/**
- * Options that are used to configure the {@link BlockingDataflowRunner}.
- */
-@Description("Configure options on the BlockingDataflowRunner.")
-public interface BlockingDataflowPipelineOptions extends 
DataflowPipelineOptions {
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
index e66ffc9..12f7b39 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
@@ -17,14 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.testing;
 
-import 
org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 
 /**
  * A set of options used to configure the {@link TestPipeline}.
  */
-public interface TestDataflowPipelineOptions extends TestPipelineOptions,
-       BlockingDataflowPipelineOptions {
-
+public interface TestDataflowPipelineOptions extends TestPipelineOptions, 
DataflowPipelineOptions {
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index a152505..0f141d2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -97,11 +96,7 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
     final DataflowPipelineJob job;
-    try {
-      job = runner.run(pipeline);
-    } catch (DataflowJobExecutionException ex) {
-      throw new IllegalStateException("The dataflow failed.");
-    }
+    job = runner.run(pipeline);
 
     LOG.info("Running Dataflow job {} with {} expected assertions.",
         job.getJobId(), expectedNumberOfAssertions);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
deleted file mode 100644
index 4572a64..0000000
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for BlockingDataflowRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowRunnerTest {
-
-  @Rule
-  public ExpectedLogs expectedLogs = 
ExpectedLogs.none(BlockingDataflowRunner.class);
-
-  @Rule
-  public ExpectedException expectedThrown = ExpectedException.none();
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobException} that applies an 
underlying {@link Matcher}
-   * to the {@link DataflowPipelineJob} returned by {@link 
DataflowJobException#getJob()}.
-   */
-  private static class DataflowJobExceptionMatcher<T extends 
DataflowJobException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with job matching ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobException> Matcher<T> expectJob(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new DataflowJobExceptionMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an 
underlying {@link Matcher}
-   * to the return value of {@link DataflowPipelineJob#getJobId()}.
-   */
-  private static class JobIdMatcher<T extends DataflowPipelineJob> extends 
TypeSafeMatcher<T> {
-
-    private final Matcher<String> matcher;
-
-    public JobIdMatcher(Matcher<String> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T job) {
-      return matcher.matches(job.getJobId());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("jobId ");
-        matcher.describeMismatch(item.getJobId(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("job with jobId ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final 
String jobId) {
-      return new JobIdMatcher<T>(equalTo(jobId));
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies 
an underlying
-   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
-   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
-   */
-  private static class ReplacedByJobMatcher<T extends 
DataflowJobUpdatedException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getReplacedByJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with replacedByJob() ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobUpdatedException> Matcher<T> 
expectReplacedBy(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new ReplacedByJobMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * Creates a mocked {@link DataflowPipelineJob} with the given {@code 
projectId} and {@code jobId}
-   * that will immediately terminate in the provided {@code terminalState}.
-   *
-   * <p>The return value may be further mocked.
-   */
-  private DataflowPipelineJob createMockJob(
-      String projectId, String jobId, State terminalState) throws Exception {
-    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
-    when(mockJob.getProjectId()).thenReturn(projectId);
-    when(mockJob.getJobId()).thenReturn(jobId);
-    when(mockJob.waitUntilFinish(any(Duration.class)))
-        .thenReturn(terminalState);
-    return mockJob;
-  }
-
-  /**
-   * Returns a {@link BlockingDataflowRunner} that will return the provided a 
job to return.
-   * Some {@link PipelineOptions} will be extracted from the job, such as the 
project ID.
-   */
-  private BlockingDataflowRunner createMockRunner(DataflowPipelineJob job)
-      throws Exception {
-    DataflowRunner mockRunner = mock(DataflowRunner.class);
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setRunner(BlockingDataflowRunner.class);
-    options.setProject(job.getProjectId());
-
-    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
-    return new BlockingDataflowRunner(mockRunner, options);
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} returns normally when a job 
terminates in
-   * the {@link State#DONE DONE} state.
-   */
-  @Test
-  public void testJobDoneComplete() throws Exception {
-    createMockRunner(createMockJob("testJobDone-projectId", 
"testJobDone-jobId", State.DONE))
-        .run(TestPipeline.create());
-    expectedLogs.verifyInfo("Job finished with status DONE");
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} throws the appropriate 
exception
-   * when a job terminates in the {@link State#FAILED FAILED} state.
-   */
-  @Test
-  public void testFailedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobExecutionException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testFailedJob-jobId")));
-    createMockRunner(createMockJob("testFailedJob-projectId", 
"testFailedJob-jobId", State.FAILED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} throws the appropriate 
exception
-   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
-   */
-  @Test
-  public void testCancelledJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobCancelledException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
-    createMockRunner(
-            createMockJob("testCancelledJob-projectId", 
"testCancelledJob-jobId", State.CANCELLED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} throws the appropriate 
exception
-   * when a job terminates in the {@link State#UPDATED UPDATED} state.
-   */
-  @Test
-  public void testUpdatedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobUpdatedException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
-    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
-        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
-    DataflowPipelineJob job =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", 
State.UPDATED);
-    DataflowPipelineJob replacedByJob =
-        createMockJob("testUpdatedJob-projectId", 
"testUpdatedJob-replacedByJobId", State.DONE);
-    when(job.getReplacedByJob()).thenReturn(replacedByJob);
-    createMockRunner(job).run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} throws the appropriate 
exception
-   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, 
indicating that the
-   * Dataflow service returned a state that the SDK is unfamiliar with 
(possibly because it
-   * is an old SDK relative the service).
-   */
-  @Test
-  public void testUnknownJobThrowsException() throws Exception {
-    expectedThrown.expect(IllegalStateException.class);
-    createMockRunner(
-            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", 
State.UNKNOWN))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowRunner} throws the appropriate 
exception
-   * when a job returns a {@code null} state, indicating that it failed to 
contact the service,
-   * including all of its built-in resilience logic.
-   */
-  @Test
-  public void testNullJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowServiceException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testNullJob-jobId")));
-    createMockRunner(createMockJob("testNullJob-projectId", 
"testNullJob-jobId", null))
-        .run(TestPipeline.create());
-  }
-
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setRunner(BlockingDataflowRunner.class);
-    assertEquals("BlockingDataflowRunner#testjobname",
-        BlockingDataflowRunner.fromOptions(options).toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 2af95e2..0527b7c 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -61,6 +61,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -86,6 +87,7 @@ import org.mockito.MockitoAnnotations;
 public class DataflowPipelineJobTest {
   private static final String PROJECT_ID = "someProject";
   private static final String JOB_ID = "1234";
+  private static final String REPLACEMENT_JOB_ID = "replacementJobId";
 
   @Mock
   private Dataflow mockWorkflowClient;
@@ -99,6 +101,9 @@ public class DataflowPipelineJobTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  @Rule
+  public ExpectedLogs expectedLogs = 
ExpectedLogs.none(DataflowPipelineJob.class);
+
   private TestDataflowPipelineOptions options;
 
   @Before
@@ -168,6 +173,9 @@ public class DataflowPipelineJobTest {
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_" + state.name());
+    if (state == State.UPDATED) {
+      statusResponse.setReplacedByJobId(REPLACEMENT_JOB_ID);
+    }
 
     when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
@@ -187,6 +195,7 @@ public class DataflowPipelineJobTest {
   @Test
   public void testWaitToFinishDone() throws Exception {
     assertEquals(State.DONE, mockWaitToFinishInState(State.DONE));
+    expectedLogs.verifyInfo(String.format("Job %s finished with status DONE.", 
JOB_ID));
   }
 
   /**
@@ -196,24 +205,39 @@ public class DataflowPipelineJobTest {
   @Test
   public void testWaitToFinishFailed() throws Exception {
     assertEquals(State.FAILED, mockWaitToFinishInState(State.FAILED));
+    expectedLogs.verifyInfo(String.format("Job %s failed with status FAILED.", 
JOB_ID));
   }
 
   /**
-   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#FAILED FAILED}
-   * state is terminal.
+   * Tests that the {@link DataflowPipelineJob} understands that the
+   * {@link State#CANCELLED CANCELLED} state is terminal.
    */
   @Test
   public void testWaitToFinishCancelled() throws Exception {
     assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED));
+    expectedLogs.verifyInfo(String.format("Job %s finished with status 
CANCELLED", JOB_ID));
   }
 
   /**
-   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#FAILED FAILED}
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#UPDATED UPDATED}
    * state is terminal.
    */
   @Test
   public void testWaitToFinishUpdated() throws Exception {
     assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED));
+    expectedLogs.verifyInfo(String.format(
+        "Job %s has been updated and is running as the new job with id %s.",
+        JOB_ID, REPLACEMENT_JOB_ID));
+  }
+
+  /**
+   * Tests that the {@link DataflowPipelineJob} understands that the {@link 
State#UNKNOWN UNKNOWN}
+   * state is terminal.
+   */
+  @Test
+  public void testWaitToFinishUnknown() throws Exception {
+    assertEquals(null, mockWaitToFinishInState(State.UNKNOWN));
+    expectedLogs.verifyWarn("No terminal state was returned. State value 
UNKNOWN");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
index f084757..31f9281 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.util.ServiceLoader;
-import 
org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
@@ -36,15 +35,13 @@ import org.junit.runners.JUnit4;
 public class DataflowPipelineRegistrarTest {
   @Test
   public void testCorrectOptionsAreReturned() {
-    assertEquals(ImmutableList.of(DataflowPipelineOptions.class,
-                                  BlockingDataflowPipelineOptions.class),
+    assertEquals(ImmutableList.of(DataflowPipelineOptions.class),
         new DataflowPipelineRegistrar.Options().getPipelineOptions());
   }
 
   @Test
   public void testCorrectRunnersAreReturned() {
-    assertEquals(ImmutableList.of(DataflowRunner.class,
-                                  BlockingDataflowRunner.class),
+    assertEquals(ImmutableList.of(DataflowRunner.class),
         new DataflowPipelineRegistrar.Runner().getPipelineRunners());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 13bf322..427ecfc 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -29,11 +29,10 @@ import org.apache.beam.sdk.util.ExecutionContext;
  * typically from the {@link DoFn} constructor. Elements can be added to the
  * {@code Aggregator} by calling {@link Aggregator#addValue}.
  *
- * <p>Aggregators are visible in the monitoring UI, when the pipeline is run
- * using DataflowRunner or BlockingDataflowRunner, along with
- * their current value. Aggregators may not become visible until the system
- * begins executing the ParDo transform that created them and/or their initial
- * value is changed.
+ * <p>Aggregators are visible in the monitoring UI, when the pipeline is run 
using
+ * {@link DataflowRunner} along with their current value.
+ * Aggregators may not become visible until the system begins executing the 
ParDo transform
+ * that created them and/or their initial value is changed.
  *
  * <p>Example:
  * <pre> {@code

Reply via email to