Repository: incubator-beam Updated Branches: refs/heads/master a59ddab21 -> f5a5eb34e
[BEAM-433] Change the ExampleUtils constructor takes PipelineOptions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46140307 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46140307 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46140307 Branch: refs/heads/master Commit: 4614030729bbaf2458f6c98dc41f9cde5451624c Parents: a59ddab Author: Pei He <[email protected]> Authored: Mon Jul 11 14:22:15 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Jul 11 18:34:41 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/common/ExampleUtils.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46140307/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index 6b71b0f..ad00a14 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -24,6 +24,9 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.Transport; @@ -65,7 +68,7 @@ public class ExampleUtils { private static final int SC_NOT_FOUND = 404; - private final DataflowPipelineOptions options; + private final PipelineOptions options; private Bigquery bigQueryClient = null; private Pubsub pubsubClient = null; private Dataflow dataflowClient = null; @@ -75,7 +78,7 @@ public class ExampleUtils { /** * Do resources and runner options setup. */ - public ExampleUtils(DataflowPipelineOptions options) { + public ExampleUtils(PipelineOptions options) { this.options = options; setupRunner(); } @@ -230,7 +233,7 @@ public class ExampleUtils { private void setupPubsubTopic(String topic) throws IOException { if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); } if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); @@ -239,7 +242,7 @@ public class ExampleUtils { private void setupPubsubSubscription(String topic, String subscription) throws IOException { if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); } if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { Subscription subInfo = new Subscription() @@ -256,7 +259,7 @@ public class ExampleUtils { */ private void deletePubsubTopic(String topic) throws IOException { if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); } if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { pubsubClient.projects().topics().delete(topic).execute(); @@ -270,7 +273,7 @@ public class ExampleUtils { */ private void deletePubsubSubscription(String subscription) throws IOException { if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); } if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { pubsubClient.projects().subscriptions().delete(subscription).execute(); @@ -283,7 +286,8 @@ public class ExampleUtils { */ private void setupRunner() { Class<? extends PipelineRunner<?>> runner = options.getRunner(); - if (options.isStreaming() && runner.equals(BlockingDataflowRunner.class)) { + if (options.as(StreamingOptions.class).isStreaming() + && runner.equals(BlockingDataflowRunner.class)) { // In order to cancel the pipelines automatically, // {@literal DataflowRunner} is forced to be used. options.setRunner(DataflowRunner.class); @@ -316,7 +320,7 @@ public class ExampleUtils { private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) { if (dataflowClient == null) { - dataflowClient = options.getDataflowClient(); + dataflowClient = options.as(DataflowPipelineOptions.class).getDataflowClient(); } Runtime.getRuntime().addShutdownHook(new Thread() {
