Repository: incubator-beam Updated Branches: refs/heads/master 290c0b772 -> 921c55c94
Rename DataflowExampleUtils and DataflowExampleOptions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a6f488f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a6f488f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a6f488f1 Branch: refs/heads/master Commit: a6f488f1d3f12411002f3d0b20c74fc9b2f909df Parents: 290c0b7 Author: Pei He <[email protected]> Authored: Thu Jul 7 13:45:24 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Jul 7 22:19:03 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/WindowedWordCount.java | 12 +- .../examples/common/DataflowExampleOptions.java | 37 -- .../examples/common/DataflowExampleUtils.java | 404 ------------------- .../beam/examples/common/ExampleOptions.java | 37 ++ .../beam/examples/common/ExampleUtils.java | 404 +++++++++++++++++++ .../beam/examples/complete/AutoComplete.java | 4 +- .../examples/complete/StreamingWordExtract.java | 4 +- .../examples/complete/TrafficMaxLaneFlow.java | 9 +- .../beam/examples/complete/TrafficRoutes.java | 9 +- .../beam/examples/cookbook/TriggerExample.java | 9 +- .../beam/examples/complete/game/GameStats.java | 4 +- .../examples/complete/game/LeaderBoard.java | 8 +- 12 files changed, 467 insertions(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index d9dc26d..b32128a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -41,8 +41,6 @@ import com.google.api.services.bigquery.model.TableSchema; import org.joda.time.Duration; import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -102,7 +100,6 @@ import java.util.List; * and then exits. */ public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); static final int WINDOW_SIZE = 1; // Default window duration in minutes /** @@ -179,7 +176,7 @@ public class WindowedWordCount { * specification of the input file. */ public static interface Options extends WordCount.WordCountOptions, - DataflowExampleOptions, ExampleBigQueryTableOptions { + ExampleOptions, ExampleBigQueryTableOptions { @Description("Fixed window duration, in minutes") @Default.Integer(WINDOW_SIZE) Integer getWindowSize(); @@ -195,8 +192,7 @@ public class WindowedWordCount { options.setBigQuerySchema(getSchema()); // DataflowExampleUtils creates the necessary input sources to simplify execution of this // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, - options.isUnbounded()); + ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java deleted file mode 100644 index 2e8ef3d..0000000 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleOptions.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.common; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; - -/** - * Options that can be used to configure the Dataflow examples. - */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { - @Description("Whether to keep jobs running on the Dataflow service after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); - - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java deleted file mode 100644 index a90968a..0000000 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.common; - -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.Transport; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Datasets; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.DatasetReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub - * injector, and cancels the streaming and the injector pipelines once the program terminates. - * - * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes. - */ -public class DataflowExampleUtils { - - private static final int SC_NOT_FOUND = 404; - - private final DataflowPipelineOptions options; - private Bigquery bigQueryClient = null; - private Pubsub pubsubClient = null; - private Dataflow dataflowClient = null; - private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet(); - private List<String> pendingMessages = Lists.newArrayList(); - - public DataflowExampleUtils(DataflowPipelineOptions options) { - this.options = options; - } - - /** - * Do resources and runner options setup. - */ - public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) - throws IOException { - this.options = options; - setupResourcesAndRunner(isUnbounded); - } - - /** - * Sets up external resources that are required by the example, - * such as Pub/Sub topics and BigQuery tables. - * - * @throws IOException if there is a problem setting up the resources - */ - public void setup() throws IOException { - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200); - Throwable lastException = null; - try { - do { - try { - setupPubsub(); - setupBigQueryTable(); - return; - } catch (GoogleJsonResponseException e) { - lastException = e; - } - } while (BackOffUtils.next(sleeper, backOff)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // Ignore InterruptedException - } - throw new RuntimeException(lastException); - } - - /** - * Set up external resources, and configure the runner appropriately. - */ - public void setupResourcesAndRunner(boolean isUnbounded) throws IOException { - if (isUnbounded) { - options.setStreaming(true); - } - setup(); - setupRunner(); - } - - /** - * Sets up the Google Cloud Pub/Sub topic. - * - * <p>If the topic doesn't exist, a new topic with the given name will be created. - * - * @throws IOException if there is a problem setting up the Pub/Sub topic - */ - public void setupPubsub() throws IOException { - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - pendingMessages.add("**********************Set Up Pubsub************************"); - setupPubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been set up for this example: " - + pubsubOptions.getPubsubTopic()); - - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - setupPubsubSubscription( - pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been set up for this example: " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - /** - * Sets up the BigQuery table with the given schema. - * - * <p>If the table already exists, the schema has to match the given one. Otherwise, the example - * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema - * will be created. - * - * @throws IOException if there is a problem setting up the BigQuery table - */ - public void setupBigQueryTable() throws IOException { - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("******************Set Up Big Query Table*******************"); - setupBigQueryTable(bigQueryTableOptions.getProject(), - bigQueryTableOptions.getBigQueryDataset(), - bigQueryTableOptions.getBigQueryTable(), - bigQueryTableOptions.getBigQuerySchema()); - pendingMessages.add("The BigQuery table has been set up for this example: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - } - } - - /** - * Tears down external resources that can be deleted upon the example's completion. - */ - private void tearDown() { - pendingMessages.add("*************************Tear Down*************************"); - ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = - options.as(ExamplePubsubTopicAndSubscriptionOptions.class); - if (!pubsubOptions.getPubsubTopic().isEmpty()) { - try { - deletePubsubTopic(pubsubOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been deleted: " - + pubsubOptions.getPubsubTopic()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub topic : " - + pubsubOptions.getPubsubTopic()); - } - if (!pubsubOptions.getPubsubSubscription().isEmpty()) { - try { - deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); - pendingMessages.add("The Pub/Sub subscription has been deleted: " - + pubsubOptions.getPubsubSubscription()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub subscription : " - + pubsubOptions.getPubsubSubscription()); - } - } - } - - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("The BigQuery table might contain the example's output, " - + "and it is not deleted automatically: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - pendingMessages.add("Please go to the Developers Console to delete it manually." - + " Otherwise, you may be charged for its usage."); - } - } - - private void setupBigQueryTable(String projectId, String datasetId, String tableId, - TableSchema schema) throws IOException { - if (bigQueryClient == null) { - bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); - } - - Datasets datasetService = bigQueryClient.datasets(); - if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { - Dataset newDataset = new Dataset().setDatasetReference( - new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); - datasetService.insert(projectId, newDataset).execute(); - } - - Tables tableService = bigQueryClient.tables(); - Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); - if (table == null) { - Table newTable = new Table().setSchema(schema).setTableReference( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); - tableService.insert(projectId, datasetId, newTable).execute(); - } else if (!table.getSchema().equals(schema)) { - throw new RuntimeException( - "Table exists and schemas do not match, expecting: " + schema.toPrettyString() - + ", actual: " + table.getSchema().toPrettyString()); - } - } - - private void setupPubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { - pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); - } - } - - private void setupPubsubSubscription(String topic, String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { - Subscription subInfo = new Subscription() - .setAckDeadlineSeconds(60) - .setTopic(topic); - pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub topic. - * - * @throws IOException if there is a problem deleting the Pub/Sub topic - */ - private void deletePubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { - pubsubClient.projects().topics().delete(topic).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub subscription. - * - * @throws IOException if there is a problem deleting the Pub/Sub subscription - */ - private void deletePubsubSubscription(String subscription) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { - pubsubClient.projects().subscriptions().delete(subscription).execute(); - } - } - - /** - * Do some runner setup: check that the DirectRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowRunner. - */ - public void setupRunner() { - Class<? extends PipelineRunner<?>> runner = options.getRunner(); - if (options.isStreaming() - && (runner.equals(DataflowRunner.class) - || runner.equals(BlockingDataflowRunner.class))) { - // In order to cancel the pipelines automatically, - // {@literal DataflowRunner} is forced to be used. - options.setRunner(DataflowRunner.class); - } - } - - /** - * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. - */ - public void waitToFinish(PipelineResult result) { - if (result instanceof DataflowPipelineJob) { - final DataflowPipelineJob job = (DataflowPipelineJob) result; - jobsToCancel.add(job); - if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) { - addShutdownHook(jobsToCancel); - } - try { - job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out)); - } catch (Exception e) { - throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId()); - } - } else { - // Do nothing if the given PipelineResult doesn't support waitToFinish(), - // such as EvaluationResults returned by DirectRunner. - tearDown(); - printPendingMessages(); - } - } - - private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) { - if (dataflowClient == null) { - dataflowClient = options.getDataflowClient(); - } - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - tearDown(); - printPendingMessages(); - for (DataflowPipelineJob job : jobs) { - System.out.println("Canceling example pipeline: " + job.getJobId()); - try { - job.cancel(); - } catch (IOException e) { - System.out.println("Failed to cancel the job," - + " please go to the Developers Console to cancel it manually"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - - for (DataflowPipelineJob job : jobs) { - boolean cancellationVerified = false; - for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { - if (job.getState().isTerminal()) { - cancellationVerified = true; - System.out.println("Canceled example pipeline: " + job.getJobId()); - break; - } else { - System.out.println( - "The example pipeline is still running. Verifying the cancellation."); - } - Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); - } - if (!cancellationVerified) { - System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); - System.out.println("Please go to the Developers Console to verify manually:"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - } - }); - } - - private void printPendingMessages() { - System.out.println(); - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - for (String message : pendingMessages) { - System.out.println(message); - } - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - } - - private static <T> T executeNullIfNotFound( - AbstractGoogleClientRequest<T> request) throws IOException { - try { - return request.execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == SC_NOT_FOUND) { - return null; - } else { - throw e; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java new file mode 100644 index 0000000..bba7b21 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.common; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; + +/** + * Options that can be used to configure the Beam examples. + */ +public interface ExampleOptions extends DataflowPipelineOptions { + @Description("Whether to keep jobs running on the Dataflow service after local process exit") + @Default.Boolean(false) + boolean getKeepJobsRunning(); + void setKeepJobsRunning(boolean keepJobsRunning); + + @Description("Number of workers to use when executing the injector pipeline") + @Default.Integer(1) + int getInjectorNumWorkers(); + void setInjectorNumWorkers(int numWorkers); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java new file mode 100644 index 0000000..e30b1e4 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.common; + +import org.apache.beam.runners.dataflow.BlockingDataflowRunner; +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.Transport; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.Bigquery.Datasets; +import com.google.api.services.bigquery.Bigquery.Tables; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * The utility class that sets up and tears down external resources, + * and cancels the streaming pipelines once the program terminates. + * + * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes. + */ +public class ExampleUtils { + + private static final int SC_NOT_FOUND = 404; + + private final DataflowPipelineOptions options; + private Bigquery bigQueryClient = null; + private Pubsub pubsubClient = null; + private Dataflow dataflowClient = null; + private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet(); + private List<String> pendingMessages = Lists.newArrayList(); + + public ExampleUtils(DataflowPipelineOptions options) { + this.options = options; + } + + /** + * Do resources and runner options setup. + */ + public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) + throws IOException { + this.options = options; + setupResourcesAndRunner(isUnbounded); + } + + /** + * Sets up external resources that are required by the example, + * such as Pub/Sub topics and BigQuery tables. + * + * @throws IOException if there is a problem setting up the resources + */ + public void setup() throws IOException { + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200); + Throwable lastException = null; + try { + do { + try { + setupPubsub(); + setupBigQueryTable(); + return; + } catch (GoogleJsonResponseException e) { + lastException = e; + } + } while (BackOffUtils.next(sleeper, backOff)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Ignore InterruptedException + } + throw new RuntimeException(lastException); + } + + /** + * Set up external resources, and configure the runner appropriately. + */ + public void setupResourcesAndRunner(boolean isUnbounded) throws IOException { + if (isUnbounded) { + options.setStreaming(true); + } + setup(); + setupRunner(); + } + + /** + * Sets up the Google Cloud Pub/Sub topic. + * + * <p>If the topic doesn't exist, a new topic with the given name will be created. + * + * @throws IOException if there is a problem setting up the Pub/Sub topic + */ + public void setupPubsub() throws IOException { + ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = + options.as(ExamplePubsubTopicAndSubscriptionOptions.class); + if (!pubsubOptions.getPubsubTopic().isEmpty()) { + pendingMessages.add("**********************Set Up Pubsub************************"); + setupPubsubTopic(pubsubOptions.getPubsubTopic()); + pendingMessages.add("The Pub/Sub topic has been set up for this example: " + + pubsubOptions.getPubsubTopic()); + + if (!pubsubOptions.getPubsubSubscription().isEmpty()) { + setupPubsubSubscription( + pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); + pendingMessages.add("The Pub/Sub subscription has been set up for this example: " + + pubsubOptions.getPubsubSubscription()); + } + } + } + + /** + * Sets up the BigQuery table with the given schema. + * + * <p>If the table already exists, the schema has to match the given one. Otherwise, the example + * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema + * will be created. + * + * @throws IOException if there is a problem setting up the BigQuery table + */ + public void setupBigQueryTable() throws IOException { + ExampleBigQueryTableOptions bigQueryTableOptions = + options.as(ExampleBigQueryTableOptions.class); + if (bigQueryTableOptions.getBigQueryDataset() != null + && bigQueryTableOptions.getBigQueryTable() != null + && bigQueryTableOptions.getBigQuerySchema() != null) { + pendingMessages.add("******************Set Up Big Query Table*******************"); + setupBigQueryTable(bigQueryTableOptions.getProject(), + bigQueryTableOptions.getBigQueryDataset(), + bigQueryTableOptions.getBigQueryTable(), + bigQueryTableOptions.getBigQuerySchema()); + pendingMessages.add("The BigQuery table has been set up for this example: " + + bigQueryTableOptions.getProject() + + ":" + bigQueryTableOptions.getBigQueryDataset() + + "." + bigQueryTableOptions.getBigQueryTable()); + } + } + + /** + * Tears down external resources that can be deleted upon the example's completion. + */ + private void tearDown() { + pendingMessages.add("*************************Tear Down*************************"); + ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = + options.as(ExamplePubsubTopicAndSubscriptionOptions.class); + if (!pubsubOptions.getPubsubTopic().isEmpty()) { + try { + deletePubsubTopic(pubsubOptions.getPubsubTopic()); + pendingMessages.add("The Pub/Sub topic has been deleted: " + + pubsubOptions.getPubsubTopic()); + } catch (IOException e) { + pendingMessages.add("Failed to delete the Pub/Sub topic : " + + pubsubOptions.getPubsubTopic()); + } + if (!pubsubOptions.getPubsubSubscription().isEmpty()) { + try { + deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); + pendingMessages.add("The Pub/Sub subscription has been deleted: " + + pubsubOptions.getPubsubSubscription()); + } catch (IOException e) { + pendingMessages.add("Failed to delete the Pub/Sub subscription : " + + pubsubOptions.getPubsubSubscription()); + } + } + } + + ExampleBigQueryTableOptions bigQueryTableOptions = + options.as(ExampleBigQueryTableOptions.class); + if (bigQueryTableOptions.getBigQueryDataset() != null + && bigQueryTableOptions.getBigQueryTable() != null + && bigQueryTableOptions.getBigQuerySchema() != null) { + pendingMessages.add("The BigQuery table might contain the example's output, " + + "and it is not deleted automatically: " + + bigQueryTableOptions.getProject() + + ":" + bigQueryTableOptions.getBigQueryDataset() + + "." + bigQueryTableOptions.getBigQueryTable()); + pendingMessages.add("Please go to the Developers Console to delete it manually." + + " Otherwise, you may be charged for its usage."); + } + } + + private void setupBigQueryTable(String projectId, String datasetId, String tableId, + TableSchema schema) throws IOException { + if (bigQueryClient == null) { + bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); + } + + Datasets datasetService = bigQueryClient.datasets(); + if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { + Dataset newDataset = new Dataset().setDatasetReference( + new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); + datasetService.insert(projectId, newDataset).execute(); + } + + Tables tableService = bigQueryClient.tables(); + Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); + if (table == null) { + Table newTable = new Table().setSchema(schema).setTableReference( + new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); + tableService.insert(projectId, datasetId, newTable).execute(); + } else if (!table.getSchema().equals(schema)) { + throw new RuntimeException( + "Table exists and schemas do not match, expecting: " + schema.toPrettyString() + + ", actual: " + table.getSchema().toPrettyString()); + } + } + + private void setupPubsubTopic(String topic) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { + pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); + } + } + + private void setupPubsubSubscription(String topic, String subscription) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { + Subscription subInfo = new Subscription() + .setAckDeadlineSeconds(60) + .setTopic(topic); + pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); + } + } + + /** + * Deletes the Google Cloud Pub/Sub topic. + * + * @throws IOException if there is a problem deleting the Pub/Sub topic + */ + private void deletePubsubTopic(String topic) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { + pubsubClient.projects().topics().delete(topic).execute(); + } + } + + /** + * Deletes the Google Cloud Pub/Sub subscription. + * + * @throws IOException if there is a problem deleting the Pub/Sub subscription + */ + private void deletePubsubSubscription(String subscription) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { + pubsubClient.projects().subscriptions().delete(subscription).execute(); + } + } + + /** + * Do some runner setup: check that the DirectRunner is not used in conjunction with + * streaming, and if streaming is specified, use the DataflowRunner. + */ + public void setupRunner() { + Class<? extends PipelineRunner<?>> runner = options.getRunner(); + if (options.isStreaming() + && (runner.equals(DataflowRunner.class) + || runner.equals(BlockingDataflowRunner.class))) { + // In order to cancel the pipelines automatically, + // {@literal DataflowRunner} is forced to be used. + options.setRunner(DataflowRunner.class); + } + } + + /** + * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, + * waits for the pipeline to finish and cancels it (and the injector) before the program exists. + */ + public void waitToFinish(PipelineResult result) { + if (result instanceof DataflowPipelineJob) { + final DataflowPipelineJob job = (DataflowPipelineJob) result; + jobsToCancel.add(job); + if (!options.as(ExampleOptions.class).getKeepJobsRunning()) { + addShutdownHook(jobsToCancel); + } + try { + job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out)); + } catch (Exception e) { + throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId()); + } + } else { + // Do nothing if the given PipelineResult doesn't support waitToFinish(), + // such as EvaluationResults returned by DirectRunner. + tearDown(); + printPendingMessages(); + } + } + + private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) { + if (dataflowClient == null) { + dataflowClient = options.getDataflowClient(); + } + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + tearDown(); + printPendingMessages(); + for (DataflowPipelineJob job : jobs) { + System.out.println("Canceling example pipeline: " + job.getJobId()); + try { + job.cancel(); + } catch (IOException e) { + System.out.println("Failed to cancel the job," + + " please go to the Developers Console to cancel it manually"); + System.out.println( + MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); + } + } + + for (DataflowPipelineJob job : jobs) { + boolean cancellationVerified = false; + for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { + if (job.getState().isTerminal()) { + cancellationVerified = true; + System.out.println("Canceled example pipeline: " + job.getJobId()); + break; + } else { + System.out.println( + "The example pipeline is still running. Verifying the cancellation."); + } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + } + if (!cancellationVerified) { + System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); + System.out.println("Please go to the Developers Console to verify manually:"); + System.out.println( + MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); + } + } + } + }); + } + + private void printPendingMessages() { + System.out.println(); + System.out.println("***********************************************************"); + System.out.println("***********************************************************"); + for (String message : pendingMessages) { + System.out.println(message); + } + System.out.println("***********************************************************"); + System.out.println("***********************************************************"); + } + + private static <T> T executeNullIfNotFound( + AbstractGoogleClientRequest<T> request) throws IOException { + try { + return request.execute(); + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() == SC_NOT_FOUND) { + return null; + } else { + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 98c4994..f8cd0f1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -20,8 +20,8 @@ package org.apache.beam.examples.complete; import static com.google.common.base.Preconditions.checkArgument; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -451,7 +451,7 @@ public class AutoComplete { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setBigQuerySchema(FormatForBigquery.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); // We support running the same pipeline in either // batch or windowed streaming mode. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index 4ea199c..046428c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -17,8 +17,8 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -120,7 +120,7 @@ public class StreamingWordExtract { options.setStreaming(true); options.setBigQuerySchema(StringToRowConverter.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); dataflowUtils.setup(); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 2db7c9e..1bbc68b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -307,8 +307,7 @@ public class TrafficMaxLaneFlow { * * <p>Inherits standard configuration options. */ - private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions, - ExampleBigQueryTableOptions { + private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") @@ -342,7 +341,7 @@ public class TrafficMaxLaneFlow { .as(TrafficMaxLaneFlowOptions.class); options.setBigQuerySchema(FormatMaxesFn.getSchema()); // Using DataflowExampleUtils to set up required resources. - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded()); + ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); TableReference tableRef = new TableReference(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index 89cfbfc..8af0922 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.complete; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; @@ -317,8 +317,7 @@ public class TrafficRoutes { * * <p>Inherits standard configuration options. */ - private interface TrafficRoutesOptions extends DataflowExampleOptions, - ExampleBigQueryTableOptions { + private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") @@ -353,7 +352,7 @@ public class TrafficRoutes { options.setBigQuerySchema(FormatStatsFn.getSchema()); // Using DataflowExampleUtils to set up required resources. - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded()); + ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded()); Pipeline pipeline = Pipeline.create(options); TableReference tableRef = new TableReference(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index 5e60835..aa91ac6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples.cookbook; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BigQueryIO; @@ -419,8 +419,7 @@ public class TriggerExample { /** * Inherits standard configuration options. */ - public interface TrafficFlowOptions - extends ExampleBigQueryTableOptions, DataflowExampleOptions { + public interface TrafficFlowOptions extends ExampleBigQueryTableOptions, ExampleOptions { @Description("Input file to read from") @Default.String("gs://dataflow-samples/traffic_sensor/" @@ -444,7 +443,7 @@ public class TriggerExample { options.setBigQuerySchema(getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); dataflowUtils.setup(); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index b1cb312..5b27f83 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -17,7 +17,7 @@ */ package org.apache.beam.examples.complete.game; -import org.apache.beam.examples.common.DataflowExampleUtils; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -242,7 +242,7 @@ public class GameStats extends LeaderBoard { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); // Enforce that this pipeline is always run in streaming mode. options.setStreaming(true); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); Pipeline pipeline = Pipeline.create(options); // Read Events from Pub/Sub using custom timestamps http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a6f488f1/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index a14d533..051b4de 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -17,8 +17,8 @@ */ package org.apache.beam.examples.complete.game; -import org.apache.beam.examples.common.DataflowExampleOptions; -import org.apache.beam.examples.common.DataflowExampleUtils; +import org.apache.beam.examples.common.ExampleOptions; +import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; @@ -102,7 +102,7 @@ public class LeaderBoard extends HourlyTeamScore { /** * Options supported by {@link LeaderBoard}. */ - static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions { + static interface Options extends HourlyTeamScore.Options, ExampleOptions { @Description("Pub/Sub topic to read from") @Validation.Required @@ -178,7 +178,7 @@ public class LeaderBoard extends HourlyTeamScore { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); // Enforce that this pipeline is always run in streaming mode. options.setStreaming(true); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + ExampleUtils dataflowUtils = new ExampleUtils(options); Pipeline pipeline = Pipeline.create(options); // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
