http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java
new file mode 100644
index 0000000..ebd42d9
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.options;
+
+import com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Options that are used to control logging configuration on the Dataflow 
worker.
+ */
+@Description("Options that are used to control logging configuration on the 
Dataflow worker.")
+public interface DataflowWorkerLoggingOptions extends PipelineOptions {
+  /**
+   * The set of log levels that can be used on the Dataflow worker.
+   */
+  public enum Level {
+    DEBUG, ERROR, INFO, TRACE, WARN
+  }
+
+  /**
+   * This option controls the default log level of all loggers without a log 
level override.
+   */
+  @Description("Controls the default log level of all loggers without a log 
level override.")
+  @Default.Enum("INFO")
+  Level getDefaultWorkerLogLevel();
+  void setDefaultWorkerLogLevel(Level level);
+
+  /**
+   * This option controls the log levels for specifically named loggers.
+   *
+   * <p>Later options with equivalent names override earlier options.
+   *
+   * <p>See {@link WorkerLogLevelOverrides} for more information on how to 
configure logging
+   * on a per {@link Class}, {@link Package}, or name basis. If used from the 
command line,
+   * the expected format is {"Name":"Level",...}, further details on
+   * {@link WorkerLogLevelOverrides#from}.
+   */
+  @Description("This option controls the log levels for specifically named 
loggers. "
+      + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker 
uses "
+      + "java.util.logging, which supports a logging hierarchy based off of 
names that are '.' "
+      + "separated. For example, by specifying the value 
{\"a.b.c.Foo\":\"DEBUG\"}, the logger "
+      + "for the class 'a.b.c.Foo' will be configured to output logs at the 
DEBUG level. "
+      + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers 
underneath the "
+      + "'a.b.c' package will be configured to output logs at the WARN level. 
Also, note that "
+      + "when multiple overrides are specified, the exact name followed by the 
closest parent "
+      + "takes precedence.")
+  WorkerLogLevelOverrides getWorkerLogLevelOverrides();
+  void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
+
+  /**
+   * Defines a log level override for a specific class, package, or name.
+   *
+   * <p>{@code java.util.logging} is used on the Dataflow worker harness and 
supports
+   * a logging hierarchy based off of names that are "." separated. It is a 
common
+   * pattern to have the logger for a given class share the same name as the 
class itself.
+   * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code 
a.b.Bar}, with
+   * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code 
"a.b.Bar"} respectively,
+   * we can override the log levels:
+   * <ul>
+   *    <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the 
{@link Class}
+   *    representing {@code a.b.c.Foo}.
+   *    <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the 
name {@code "a.b"} or
+   *    the {@link Package} representing {@code a.b}.
+   *    <li>for {@code Foo} and {@code Bar} by specifying both of their names 
or classes.
+   * </ul>
+   * Note that by specifying multiple overrides, the exact name followed by 
the closest parent
+   * takes precedence.
+   */
+  public static class WorkerLogLevelOverrides extends HashMap<String, Level> {
+    /**
+     * Overrides the default log level for the passed in class.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Class#getName() class name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level 
level) {
+      Preconditions.checkNotNull(klass, "Expected class to be not null.");
+      addOverrideForName(klass.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in package.
+     *
+     * <p>This is equivalent to calling
+     * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)}
+     * and passing in the {@link Package#getName() package name}.
+     */
+    public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level 
level) {
+      Preconditions.checkNotNull(pkg, "Expected package to be not null.");
+      addOverrideForName(pkg.getName(), level);
+      return this;
+    }
+
+    /**
+     * Overrides the default log level for the passed in name.
+     *
+     * <p>Note that because of the hierarchical nature of logger names, this 
will
+     * override the log level of all loggers that have the passed in name or
+     * a parent logger that has the passed in name.
+     */
+    public WorkerLogLevelOverrides addOverrideForName(String name, Level 
level) {
+      Preconditions.checkNotNull(name, "Expected name to be not null.");
+      Preconditions.checkNotNull(level,
+          "Expected level to be one of %s.", Arrays.toString(Level.values()));
+      put(name, level);
+      return this;
+    }
+
+    /**
+     * Expects a map keyed by logger {@code Name}s with values representing 
{@code Level}s.
+     * The {@code Name} generally represents the fully qualified Java
+     * {@link Class#getName() class name}, or fully qualified Java
+     * {@link Package#getName() package name}, or custom logger name. The 
{@code Level}
+     * represents the log level and must be one of {@link Level}.
+     */
+    @JsonCreator
+    public static WorkerLogLevelOverrides from(Map<String, String> values) {
+      Preconditions.checkNotNull(values, "Expected values to be not null.");
+      WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides();
+      for (Map.Entry<String, String> entry : values.entrySet()) {
+        try {
+          overrides.addOverrideForName(entry.getKey(), 
Level.valueOf(entry.getValue()));
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(String.format(
+              "Unsupported log level '%s' requested for %s. Must be one of 
%s.",
+              entry.getValue(), entry.getKey(), 
Arrays.toString(Level.values())));
+        }
+
+      }
+      return overrides;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java
new file mode 100644
index 0000000..9096c2b
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult.State;
+import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner}
+ * but that waits for the launched job to finish.
+ *
+ * <p>Prints out job status updates and console messages while it waits.
+ *
+ * <p>Returns the final job state, or throws an exception if the job
+ * fails or cannot be monitored.
+ *
+ * <p><h3>Permissions</h3>
+ * When reading from a Dataflow source or writing to a Dataflow sink using
+ * {@code BlockingDataflowPipelineRunner}, the Google cloud services account 
and the Google compute
+ * engine service account of the GCP project running the Dataflow Job will 
need access to the
+ * corresponding source/sink.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/dataflow/security-and-permissions";>Google Cloud
+ * Dataflow Security and Permissions</a> for more details.
+ */
+public class BlockingDataflowPipelineRunner extends
+    PipelineRunner<DataflowPipelineJob> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class);
+
+  // Defaults to an infinite wait period.
+  // TODO: make this configurable after removal of option map.
+  private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L;
+
+  private final DataflowPipelineRunner dataflowPipelineRunner;
+  private final BlockingDataflowPipelineOptions options;
+
+  protected BlockingDataflowPipelineRunner(
+      DataflowPipelineRunner internalRunner,
+      BlockingDataflowPipelineOptions options) {
+    this.dataflowPipelineRunner = internalRunner;
+    this.options = options;
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static BlockingDataflowPipelineRunner fromOptions(
+      PipelineOptions options) {
+    BlockingDataflowPipelineOptions dataflowOptions =
+        
PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, 
options);
+    DataflowPipelineRunner dataflowPipelineRunner =
+        DataflowPipelineRunner.fromOptions(dataflowOptions);
+
+    return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, 
dataflowOptions);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws DataflowJobExecutionException if there is an exception during job 
execution.
+   * @throws DataflowServiceException if there is an exception retrieving 
information about the job.
+   */
+  @Override
+  public DataflowPipelineJob run(Pipeline p) {
+    final DataflowPipelineJob job = dataflowPipelineRunner.run(p);
+
+    // We ignore the potential race condition here (Ctrl-C after job 
submission but before the
+    // shutdown hook is registered). Even if we tried to do something smarter 
(eg., SettableFuture)
+    // the run method (which produces the job) could fail or be Ctrl-C'd 
before it had returned a
+    // job. The display of the command to cancel the job is best-effort 
anyways -- RPC's could fail,
+    // etc. If the user wants to verify the job was cancelled they should look 
at the job status.
+    Thread shutdownHook = new Thread() {
+      @Override
+      public void run() {
+        LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will 
not cancel it.\n"
+            + "To cancel the job in the cloud, run:\n> {}",
+            MonitoringUtil.getGcloudCancelCommand(options, job.getJobId()));
+      }
+    };
+
+    try {
+      Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+      @Nullable
+      State result;
+      try {
+        result = job.waitToFinish(
+            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
+            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+      } catch (IOException | InterruptedException ex) {
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        LOG.debug("Exception caught while retrieving status for job {}", 
job.getJobId(), ex);
+        throw new DataflowServiceException(
+            job, "Exception caught while retrieving status for job " + 
job.getJobId(), ex);
+      }
+
+      if (result == null) {
+        throw new DataflowServiceException(
+            job, "Timed out while retrieving status for job " + 
job.getJobId());
+      }
+
+      LOG.info("Job finished with status {}", result);
+      if (!result.isTerminal()) {
+        throw new IllegalStateException("Expected terminal state for job " + 
job.getJobId()
+            + ", got " + result);
+      }
+
+      if (result == State.DONE) {
+        return job;
+      } else if (result == State.UPDATED) {
+        DataflowPipelineJob newJob = job.getReplacedByJob();
+        LOG.info("Job {} has been updated and is running as the new job with 
id {}."
+            + "To access the updated job on the Dataflow monitoring console, 
please navigate to {}",
+            job.getJobId(),
+            newJob.getJobId(),
+            MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), 
newJob.getJobId()));
+        throw new DataflowJobUpdatedException(
+            job,
+            String.format("Job %s updated; new job is %s.", job.getJobId(), 
newJob.getJobId()),
+            newJob);
+      } else if (result == State.CANCELLED) {
+        String message = String.format("Job %s cancelled by user", 
job.getJobId());
+        LOG.info(message);
+        throw new DataflowJobCancelledException(job, message);
+      } else {
+        throw new DataflowJobExecutionException(job, "Job " + job.getJobId()
+            + " failed with status " + result);
+      }
+    } finally {
+      Runtime.getRuntime().removeShutdownHook(shutdownHook);
+    }
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    return dataflowPipelineRunner.apply(transform, input);
+  }
+
+  /**
+   * Sets callbacks to invoke during execution. See {@link 
DataflowPipelineRunnerHooks}.
+   */
+  @Experimental
+  public void setHooks(DataflowPipelineRunnerHooks hooks) {
+    this.dataflowPipelineRunner.setHooks(hooks);
+  }
+
+  @Override
+  public String toString() {
+    return "BlockingDataflowPipelineRunner#" + options.getJobName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java
new file mode 100644
index 0000000..4a4f100
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * An exception that is thrown if the unique job name constraint of the 
Dataflow
+ * service is broken because an existing job with the same job name is 
currently active.
+ * The {@link DataflowPipelineJob} contained within this exception contains 
information
+ * about the pre-existing job.
+ */
+public class DataflowJobAlreadyExistsException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyExistsException} with the specified 
{@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobAlreadyExistsException(
+      DataflowPipelineJob job, String message) {
+    super(job, message, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java
new file mode 100644
index 0000000..1f52c6a
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * An exception that is thrown if the existing job has already been updated 
within the Dataflow
+ * service and is no longer able to be updated. The {@link 
DataflowPipelineJob} contained within
+ * this exception contains information about the pre-existing updated job.
+ */
+public class DataflowJobAlreadyUpdatedException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the 
specified {@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobAlreadyUpdatedException(
+      DataflowPipelineJob job, String message) {
+    super(job, message, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java
new file mode 100644
index 0000000..495ca5a
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was 
updated during execution.
+ */
+public class DataflowJobCancelledException extends DataflowJobException {
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the 
specified {@link
+   * DataflowPipelineJob} and message.
+   */
+  public DataflowJobCancelledException(DataflowPipelineJob job, String 
message) {
+    super(job, message, null);
+  }
+
+  /**
+   * Create a new {@code DataflowJobAlreadyUpdatedException} with the 
specified {@link
+   * DataflowPipelineJob}, message, and cause.
+   */
+  public DataflowJobCancelledException(DataflowPipelineJob job, String 
message, Throwable cause) {
+    super(job, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java
new file mode 100644
index 0000000..a22d13c
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link RuntimeException} that contains information about a {@link 
DataflowPipelineJob}.
+ */
+public abstract class DataflowJobException extends RuntimeException {
+  private final DataflowPipelineJob job;
+
+  DataflowJobException(DataflowPipelineJob job, String message, @Nullable 
Throwable cause) {
+    super(message, cause);
+    this.job = Objects.requireNonNull(job);
+  }
+
+  /**
+   * Returns the failed job.
+   */
+  public DataflowPipelineJob getJob() {
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java
new file mode 100644
index 0000000..3dbb007
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import javax.annotation.Nullable;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails 
during execution, and
+ * provides access to the failed job.
+ */
+public class DataflowJobExecutionException extends DataflowJobException {
+  DataflowJobExecutionException(DataflowPipelineJob job, String message) {
+    this(job, message, null);
+  }
+
+  DataflowJobExecutionException(
+      DataflowPipelineJob job, String message, @Nullable Throwable cause) {
+    super(job, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java
new file mode 100644
index 0000000..5792941
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+/**
+ * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was 
updated during execution.
+ */
+public class DataflowJobUpdatedException extends DataflowJobException {
+  private DataflowPipelineJob replacedByJob;
+
+  /**
+   * Create a new {@code DataflowJobUpdatedException} with the specified 
original {@link
+   * DataflowPipelineJob}, message, and replacement {@link 
DataflowPipelineJob}.
+   */
+  public DataflowJobUpdatedException(
+      DataflowPipelineJob job, String message, DataflowPipelineJob 
replacedByJob) {
+    this(job, message, replacedByJob, null);
+  }
+
+  /**
+   * Create a new {@code DataflowJobUpdatedException} with the specified 
original {@link
+   * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, 
and cause.
+   */
+  public DataflowJobUpdatedException(
+      DataflowPipelineJob job, String message, DataflowPipelineJob 
replacedByJob, Throwable cause) {
+    super(job, message, cause);
+    this.replacedByJob = replacedByJob;
+  }
+
+  /**
+   * The new job that replaces the job terminated with this exception.
+   */
+  public DataflowPipelineJob getReplacedByJob() {
+    return replacedByJob;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
new file mode 100644
index 0000000..7b97c7d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+/**
+ * A {@link DataflowPipeline} is a {@link Pipeline} that returns a
+ * {@link DataflowPipelineJob} when it is
+ * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
+ *
+ * <p>This is not intended for use by users of Cloud Dataflow.
+ * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a
+ * {@link Pipeline}.
+ */
+public class DataflowPipeline extends Pipeline {
+
+  /**
+   * Creates and returns a new {@link DataflowPipeline} instance for tests.
+   */
+  public static DataflowPipeline create(DataflowPipelineOptions options) {
+    return new DataflowPipeline(options);
+  }
+
+  private DataflowPipeline(DataflowPipelineOptions options) {
+    super(DataflowPipelineRunner.fromOptions(options), options);
+  }
+
+  @Override
+  public DataflowPipelineJob run() {
+    return (DataflowPipelineJob) super.run();
+  }
+
+  @Override
+  public DataflowPipelineRunner getRunner() {
+    return (DataflowPipelineRunner) super.getRunner();
+  }
+
+  @Override
+  public String toString() {
+    return "DataflowPipeline#" + 
getOptions().as(DataflowPipelineOptions.class).getJobName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java
new file mode 100644
index 0000000..632be6d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import 
com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
+import 
com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import 
com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
+import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
+import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * A DataflowPipelineJob represents a job submitted to Dataflow using
+ * {@link DataflowPipelineRunner}.
+ */
+public class DataflowPipelineJob implements PipelineResult {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataflowPipelineJob.class);
+
+  /**
+   * The id for the job.
+   */
+  private String jobId;
+
+  /**
+   * Google cloud project to associate this pipeline with.
+   */
+  private String projectId;
+
+  /**
+   * Client for the Dataflow service. This can be used to query the service
+   * for information about the job.
+   */
+  private Dataflow dataflowClient;
+
+  /**
+   * The state the job terminated in or {@code null} if the job has not 
terminated.
+   */
+  @Nullable
+  private State terminalState = null;
+
+  /**
+   * The job that replaced this one or {@code null} if the job has not been 
replaced.
+   */
+  @Nullable
+  private DataflowPipelineJob replacedByJob = null;
+
+  private DataflowAggregatorTransforms aggregatorTransforms;
+
+  /**
+   * The Metric Updates retrieved after the job was in a terminal state.
+   */
+  private List<MetricUpdate> terminalMetricUpdates;
+
+  /**
+   * The polling interval for job status and messages information.
+   */
+  static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+  static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2);
+
+  /**
+   * The amount of polling attempts for job status and messages information.
+   */
+  static final int MESSAGES_POLLING_ATTEMPTS = 10;
+  static final int STATUS_POLLING_ATTEMPTS = 5;
+
+  /**
+   * Constructs the job.
+   *
+   * @param projectId the project id
+   * @param jobId the job id
+   * @param dataflowClient the client for the Dataflow Service
+   */
+  public DataflowPipelineJob(String projectId, String jobId, Dataflow 
dataflowClient,
+      DataflowAggregatorTransforms aggregatorTransforms) {
+    this.projectId = projectId;
+    this.jobId = jobId;
+    this.dataflowClient = dataflowClient;
+    this.aggregatorTransforms = aggregatorTransforms;
+  }
+
+  /**
+   * Get the id of this job.
+   */
+  public String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * Get the project this job exists in.
+   */
+  public String getProjectId() {
+    return projectId;
+  }
+
+  /**
+   * Returns a new {@link DataflowPipelineJob} for the job that replaced this 
one, if applicable.
+   *
+   * @throws IllegalStateException if called before the job has terminated or 
if the job terminated
+   * but was not updated
+   */
+  public DataflowPipelineJob getReplacedByJob() {
+    if (terminalState == null) {
+      throw new IllegalStateException("getReplacedByJob() called before job 
terminated");
+    }
+    if (replacedByJob == null) {
+      throw new IllegalStateException("getReplacedByJob() called for job that 
was not replaced");
+    }
+    return replacedByJob;
+  }
+
+  /**
+   * Get the Cloud Dataflow API Client used by this job.
+   */
+  public Dataflow getDataflowClient() {
+    return dataflowClient;
+  }
+
+  /**
+   * Waits for the job to finish and return the final status.
+   *
+   * @param timeToWait The time to wait in units timeUnit for the job to 
finish.
+   *     Provide a value less than 1 ms for an infinite wait.
+   * @param timeUnit The unit of time for timeToWait.
+   * @param messageHandler If non null this handler will be invoked for each
+   *   batch of messages received.
+   * @return The final state of the job or null on timeout or if the
+   *   thread is interrupted.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException
+   */
+  @Nullable
+  public State waitToFinish(
+      long timeToWait,
+      TimeUnit timeUnit,
+      MonitoringUtil.JobMessagesHandler messageHandler)
+          throws IOException, InterruptedException {
+    return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, 
NanoClock.SYSTEM);
+  }
+
+  /**
+   * Wait for the job to finish and return the final status.
+   *
+   * @param timeToWait The time to wait in units timeUnit for the job to 
finish.
+   *     Provide a value less than 1 ms for an infinite wait.
+   * @param timeUnit The unit of time for timeToWait.
+   * @param messageHandler If non null this handler will be invoked for each
+   *   batch of messages received.
+   * @param sleeper A sleeper to use to sleep between attempts.
+   * @param nanoClock A nanoClock used to time the total time taken.
+   * @return The final state of the job or null on timeout or if the
+   *   thread is interrupted.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException
+   */
+  @Nullable
+  @VisibleForTesting
+  State waitToFinish(
+      long timeToWait,
+      TimeUnit timeUnit,
+      MonitoringUtil.JobMessagesHandler messageHandler,
+      Sleeper sleeper,
+      NanoClock nanoClock)
+          throws IOException, InterruptedException {
+    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
+
+    long lastTimestamp = 0;
+    BackOff backoff =
+        timeUnit.toMillis(timeToWait) > 0
+            ? new AttemptAndTimeBoundedExponentialBackOff(
+                MESSAGES_POLLING_ATTEMPTS,
+                MESSAGES_POLLING_INTERVAL,
+                timeUnit.toMillis(timeToWait),
+                AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
+                nanoClock)
+            : new AttemptBoundedExponentialBackOff(
+                MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL);
+    State state;
+    do {
+      // Get the state of the job before listing messages. This ensures we 
always fetch job
+      // messages after the job finishes to ensure we have all them.
+      state = getStateWithRetries(1, sleeper);
+      boolean hasError = state == State.UNKNOWN;
+
+      if (messageHandler != null && !hasError) {
+        // Process all the job messages that have accumulated so far.
+        try {
+          List<JobMessage> allMessages = monitor.getJobMessages(
+              jobId, lastTimestamp);
+
+          if (!allMessages.isEmpty()) {
+            lastTimestamp =
+                fromCloudTime(allMessages.get(allMessages.size() - 
1).getTime()).getMillis();
+            messageHandler.process(allMessages);
+          }
+        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
+          hasError = true;
+          LOG.warn("There were problems getting current job messages: {}.", 
e.getMessage());
+          LOG.debug("Exception information:", e);
+        }
+      }
+
+      if (!hasError) {
+        backoff.reset();
+        // Check if the job is done.
+        if (state.isTerminal()) {
+          return state;
+        }
+      }
+    } while(BackOffUtils.next(sleeper, backoff));
+    LOG.warn("No terminal state was returned.  State value {}", state);
+    return null;  // Timed out.
+  }
+
+  /**
+   * Cancels the job.
+   * @throws IOException if there is a problem executing the cancel request.
+   */
+  public void cancel() throws IOException {
+    Job content = new Job();
+    content.setProjectId(projectId);
+    content.setId(jobId);
+    content.setRequestedState("JOB_STATE_CANCELLED");
+    dataflowClient.projects().jobs()
+        .update(projectId, jobId, content)
+        .execute();
+  }
+
+  @Override
+  public State getState() {
+    if (terminalState != null) {
+      return terminalState;
+    }
+
+    return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT);
+  }
+
+  /**
+   * Attempts to get the state. Uses exponential backoff on failure up to the 
maximum number
+   * of passed in attempts.
+   *
+   * @param attempts The amount of attempts to make.
+   * @param sleeper Object used to do the sleeps between attempts.
+   * @return The state of the job or State.UNKNOWN in case of failure.
+   */
+  @VisibleForTesting
+  State getStateWithRetries(int attempts, Sleeper sleeper) {
+    if (terminalState != null) {
+      return terminalState;
+    }
+    try {
+      Job job = getJobWithRetries(attempts, sleeper);
+      return MonitoringUtil.toState(job.getCurrentState());
+    } catch (IOException exn) {
+      // The only IOException that getJobWithRetries is permitted to throw is 
the final IOException
+      // that caused the failure of retry. Other exceptions are wrapped in an 
unchecked exceptions
+      // and will propagate.
+      return State.UNKNOWN;
+    }
+  }
+
+  /**
+   * Attempts to get the underlying {@link Job}. Uses exponential backoff on 
failure up to the
+   * maximum number of passed in attempts.
+   *
+   * @param attempts The amount of attempts to make.
+   * @param sleeper Object used to do the sleeps between attempts.
+   * @return The underlying {@link Job} object.
+   * @throws IOException When the maximum number of retries is exhausted, the 
last exception is
+   * thrown.
+   */
+  @VisibleForTesting
+  Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException {
+    AttemptBoundedExponentialBackOff backoff =
+        new AttemptBoundedExponentialBackOff(attempts, 
STATUS_POLLING_INTERVAL);
+
+    // Retry loop ends in return or throw
+    while (true) {
+      try {
+        Job job = dataflowClient
+            .projects()
+            .jobs()
+            .get(projectId, jobId)
+            .execute();
+        State currentState = MonitoringUtil.toState(job.getCurrentState());
+        if (currentState.isTerminal()) {
+          terminalState = currentState;
+          replacedByJob = new DataflowPipelineJob(
+              getProjectId(), job.getReplacedByJobId(), dataflowClient, 
aggregatorTransforms);
+        }
+        return job;
+      } catch (IOException exn) {
+        LOG.warn("There were problems getting current job status: {}.", 
exn.getMessage());
+        LOG.debug("Exception information:", exn);
+
+        if (!nextBackOff(sleeper, backoff)) {
+          throw exn;
+        }
+      }
+    }
+  }
+
+  /**
+   * Identical to {@link BackOffUtils#next} but without checked exceptions.
+   */
+  private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (InterruptedException | IOException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, 
OutputT> aggregator)
+      throws AggregatorRetrievalException {
+    try {
+      return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
+    } catch (IOException e) {
+      throw new AggregatorRetrievalException(
+          "IOException when retrieving Aggregator values for Aggregator " + 
aggregator, e);
+    }
+  }
+
+  private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, 
OutputT> aggregator)
+      throws IOException {
+    if (aggregatorTransforms.contains(aggregator)) {
+      List<MetricUpdate> metricUpdates;
+      if (terminalMetricUpdates != null) {
+        metricUpdates = terminalMetricUpdates;
+      } else {
+        boolean terminal = getState().isTerminal();
+        JobMetrics jobMetrics =
+            dataflowClient.projects().jobs().getMetrics(projectId, 
jobId).execute();
+        metricUpdates = jobMetrics.getMetrics();
+        if (terminal && jobMetrics.getMetrics() != null) {
+          terminalMetricUpdates = metricUpdates;
+        }
+      }
+
+      return DataflowMetricUpdateExtractor.fromMetricUpdates(
+          aggregator, aggregatorTransforms, metricUpdates);
+    } else {
+      throw new IllegalArgumentException(
+          "Aggregator " + aggregator + " is not used in this pipeline");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
new file mode 100644
index 0000000..8aaa7cc
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners;
+
+import com.google.auto.service.AutoService;
+import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineOptionsRegistrar} and {@link 
PipelineRunnerRegistrar} for
+ * the {@link DataflowPipeline}.
+ */
+public class DataflowPipelineRegistrar {
+  private DataflowPipelineRegistrar() { }
+
+  /**
+   * Register the {@link DataflowPipelineOptions} and {@link 
BlockingDataflowPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(
+          DataflowPipelineOptions.class,
+          BlockingDataflowPipelineOptions.class);
+    }
+  }
+
+  /**
+   * Register the {@link DataflowPipelineRunner} and {@link 
BlockingDataflowPipelineRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          DataflowPipelineRunner.class,
+          BlockingDataflowPipelineRunner.class);
+    }
+  }
+}

Reply via email to