http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java deleted file mode 100644 index 4bedf31..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure Pub/Sub topic in Dataflow examples. - */ -public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions { - @Description("Pub/Sub topic") - @Default.InstanceFactory(PubsubTopicFactory.class) - String getPubsubTopic(); - void setPubsubTopic(String topic); - - /** - * Returns a default Pub/Sub topic based on the project and the job names. - */ - static class PubsubTopicFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowPipelineOptions = - options.as(DataflowPipelineOptions.class); - return "projects/" + dataflowPipelineOptions.getProject() - + "/topics/" + dataflowPipelineOptions.getJobName(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java deleted file mode 100644 index 4a82ae6..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.common; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.common.collect.ImmutableMap; - -import java.io.IOException; -import java.util.Arrays; - -/** - * A batch Dataflow pipeline for injecting a set of GCS files into - * a PubSub topic line by line. Empty lines are skipped. - * - * <p>This is useful for testing streaming - * pipelines. Note that since batch pipelines might retry chunks, this - * does _not_ guarantee exactly-once injection of file data. Some lines may - * be published multiple times. - * </p> - */ -public class PubsubFileInjector { - - /** - * An incomplete {@code PubsubFileInjector} transform with unbound output topic. - */ - public static class Unbound { - private final String timestampLabelKey; - - Unbound() { - this.timestampLabelKey = null; - } - - Unbound(String timestampLabelKey) { - this.timestampLabelKey = timestampLabelKey; - } - - Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - public Bound publish(String outputTopic) { - return new Bound(outputTopic, timestampLabelKey); - } - } - - /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */ - public static class Bound extends DoFn<String, Void> { - private final String outputTopic; - private final String timestampLabelKey; - public transient Pubsub pubsub; - - public Bound(String outputTopic, String timestampLabelKey) { - this.outputTopic = outputTopic; - this.timestampLabelKey = timestampLabelKey; - } - - @Override - public void startBundle(Context context) { - this.pubsub = - Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class)) - .build(); - } - - @Override - public void processElement(ProcessContext c) throws IOException { - if (c.element().isEmpty()) { - return; - } - PubsubMessage pubsubMessage = new PubsubMessage(); - pubsubMessage.encodeData(c.element().getBytes()); - if (timestampLabelKey != null) { - pubsubMessage.setAttributes( - ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis()))); - } - PublishRequest publishRequest = new PublishRequest(); - publishRequest.setMessages(Arrays.asList(pubsubMessage)); - this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute(); - } - } - - /** - * Creates a {@code PubsubFileInjector} transform with the given timestamp label key. - */ - public static Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - /** - * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic. - */ - public static Bound publish(String outputTopic) { - return new Unbound().publish(outputTopic); - } - - /** - * Command line parameter options. - */ - private interface PubsubFileInjectorOptions extends PipelineOptions { - @Description("GCS location of files.") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Topic to publish on.") - @Validation.Required - String getOutputTopic(); - void setOutputTopic(String value); - } - - /** - * Sets up and starts streaming pipeline. - */ - public static void main(String[] args) { - PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(PubsubFileInjectorOptions.class); - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply(TextIO.Read.from(options.getInput())) - .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic())) - .withMaxParallelism(20)); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java deleted file mode 100644 index f897338..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java +++ /dev/null @@ -1,516 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.datastore.DatastoreV1.Entity; -import com.google.api.services.datastore.DatastoreV1.Key; -import com.google.api.services.datastore.DatastoreV1.Value; -import com.google.api.services.datastore.client.DatastoreHelper; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; -import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.DatastoreIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Filter; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Partition; -import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.transforms.Top; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; - -import org.joda.time.Duration; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * An example that computes the most popular hash tags - * for every prefix, which can be used for auto-completion. - * - * <p>Concepts: Using the same pipeline in both streaming and batch, combiners, - * composite transforms. - * - * <p>To execute this pipeline using the Dataflow service in batch mode, - * specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=DataflowPipelineRunner - * --inputFile=gs://path/to/input*.txt - * }</pre> - * - * <p>To execute this pipeline using the Dataflow service in streaming mode, - * specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=DataflowPipelineRunner - * --inputFile=gs://YOUR_INPUT_DIRECTORY/*.txt - * --streaming - * }</pre> - * - * <p>This will update the datastore every 10 seconds based on the last - * 30 minutes of data received. - */ -public class AutoComplete { - - /** - * A PTransform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) { - PCollection<CompletionCandidate> candidates = input - // First count how often each token appears. - .apply(new Count.PerElement<String>()) - - // Map the KV outputs of Count into our own CompletionCandiate class. - .apply(ParDo.named("CreateCompletionCandidates").of( - new DoFn<KV<String, Long>, CompletionCandidate>() { - @Override - public void processElement(ProcessContext c) { - c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue())); - } - })); - - // Compute the top via either a flat or recursive algorithm. - if (recursive) { - return candidates - .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) - .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - } else { - return candidates - .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); - } - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends PTransform<PCollection<CompletionCandidate>, - PCollection<KV<String, List<CompletionCandidate>>>> { - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> apply( - PCollection<CompletionCandidate> input) { - return input - // For each completion candidate, map it to all prefixes. - .apply(ParDo.of(new AllPrefixes(minPrefix))) - - // Find and return the top candiates for each prefix. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) - .withHotKeyFanout(new HotKeyFanout())); - } - - private static class HotKeyFanout implements SerializableFunction<String, Integer> { - @Override - public Integer apply(String input) { - return (int) Math.pow(4, 5 - input.length()); - } - } - } - - /** - * Cheaper but higher latency. - * - * <p>Returns two PCollections, the first is top prefixes of size greater - * than minPrefix, and the second is top prefixes of size exactly - * minPrefix. - */ - private static class ComputeTopRecursive - extends PTransform<PCollection<CompletionCandidate>, - PCollectionList<KV<String, List<CompletionCandidate>>>> { - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { - @Override - public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { - return elem.getKey().length() > minPrefix ? 0 : 1; - } - } - - private static class FlattenTops - extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { - @Override - public void processElement(ProcessContext c) { - for (CompletionCandidate cc : c.element().getValue()) { - c.output(cc); - } - } - } - - @Override - public PCollectionList<KV<String, List<CompletionCandidate>>> apply( - PCollection<CompletionCandidate> input) { - if (minPrefix > 10) { - // Base case, partitioning to return the output in the expected format. - return input - .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) - .apply(Partition.of(2, new KeySizePartitionFn())); - } else { - // If a candidate is in the top N for prefix a...b, it must also be in the top - // N for a...bX for every X, which is typlically a much smaller set to consider. - // First, compute the top candidate for prefixes of size at least minPrefix + 1. - PCollectionList<KV<String, List<CompletionCandidate>>> larger = input - .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); - // Consider the top candidates for each prefix of length minPrefix + 1... - PCollection<KV<String, List<CompletionCandidate>>> small = - PCollectionList - .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) - // ...together with those (previously excluded) candidates of length - // exactly minPrefix... - .and(input.apply(Filter.byPredicate( - new SerializableFunction<CompletionCandidate, Boolean>() { - @Override - public Boolean apply(CompletionCandidate c) { - return c.getValue().length() == minPrefix; - } - }))) - .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) - // ...set the key to be the minPrefix-length prefix... - .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) - // ...and (re)apply the Top operator to all of them together. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); - - PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger - .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - - return PCollectionList.of(flattenLarger).and(small); - } - } - } - - /** - * A DoFn that keys each candidate by all its prefixes. - */ - private static class AllPrefixes - extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { - private final int minPrefix; - private final int maxPrefix; - public AllPrefixes(int minPrefix) { - this(minPrefix, Integer.MAX_VALUE); - } - public AllPrefixes(int minPrefix, int maxPrefix) { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - @Override - public void processElement(ProcessContext c) { - String word = c.element().value; - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - c.output(KV.of(word.substring(0, i), c.element())); - } - } - } - - /** - * Class used to store tag-count pairs. - */ - @DefaultCoder(AvroCoder.class) - static class CompletionCandidate implements Comparable<CompletionCandidate> { - private long count; - private String value; - - public CompletionCandidate(String value, long count) { - this.value = value; - this.count = count; - } - - public long getCount() { - return count; - } - - public String getValue() { - return value; - } - - // Empty constructor required for Avro decoding. - public CompletionCandidate() {} - - @Override - public int compareTo(CompletionCandidate o) { - if (this.count < o.count) { - return -1; - } else if (this.count == o.count) { - return this.value.compareTo(o.value); - } else { - return 1; - } - } - - @Override - public boolean equals(Object other) { - if (other instanceof CompletionCandidate) { - CompletionCandidate that = (CompletionCandidate) other; - return this.count == that.count && this.value.equals(that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Long.valueOf(count).hashCode() ^ value.hashCode(); - } - - @Override - public String toString() { - return "CompletionCandidate[" + value + ", " + count + "]"; - } - } - - /** - * Takes as input a set of strings, and emits each #hashtag found therein. - */ - static class ExtractHashtags extends DoFn<String, String> { - @Override - public void processElement(ProcessContext c) { - Matcher m = Pattern.compile("#\\S+").matcher(c.element()); - while (m.find()) { - c.output(m.group().substring(1)); - } - } - } - - static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> { - @Override - public void processElement(ProcessContext c) { - List<TableRow> completions = new ArrayList<>(); - for (CompletionCandidate cc : c.element().getValue()) { - completions.add(new TableRow() - .set("count", cc.getCount()) - .set("tag", cc.getValue())); - } - TableRow row = new TableRow() - .set("prefix", c.element().getKey()) - .set("tags", completions); - c.output(row); - } - - /** - * Defines the BigQuery schema used for the output. - */ - static TableSchema getSchema() { - List<TableFieldSchema> tagFields = new ArrayList<>(); - tagFields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - tagFields.add(new TableFieldSchema().setName("tag").setType("STRING")); - List<TableFieldSchema> fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("prefix").setType("STRING")); - fields.add(new TableFieldSchema() - .setName("tags").setType("RECORD").setMode("REPEATED").setFields(tagFields)); - return new TableSchema().setFields(fields); - } - } - - /** - * Takes as input a the top candidates per prefix, and emits an entity - * suitable for writing to Datastore. - */ - static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> { - private String kind; - - public FormatForDatastore(String kind) { - this.kind = kind; - } - - @Override - public void processElement(ProcessContext c) { - Entity.Builder entityBuilder = Entity.newBuilder(); - Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build(); - - entityBuilder.setKey(key); - List<Value> candidates = new ArrayList<>(); - for (CompletionCandidate tag : c.element().getValue()) { - Entity.Builder tagEntity = Entity.newBuilder(); - tagEntity.addProperty( - DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value))); - tagEntity.addProperty( - DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count))); - candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build()); - } - entityBuilder.addProperty( - DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates))); - c.output(entityBuilder.build()); - } - } - - /** - * Options supported by this class. - * - * <p>Inherits standard Dataflow configuration options. - */ - private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - @Description("Input text file") - String getInputFile(); - void setInputFile(String value); - - @Description("Whether to use the recursive algorithm") - @Default.Boolean(true) - Boolean getRecursive(); - void setRecursive(Boolean value); - - @Description("Dataset entity kind") - @Default.String("autocomplete-demo") - String getKind(); - void setKind(String value); - - @Description("Whether output to BigQuery") - @Default.Boolean(true) - Boolean getOutputToBigQuery(); - void setOutputToBigQuery(Boolean value); - - @Description("Whether output to Datastore") - @Default.Boolean(false) - Boolean getOutputToDatastore(); - void setOutputToDatastore(Boolean value); - - @Description("Datastore output dataset ID, defaults to project ID") - String getOutputDataset(); - void setOutputDataset(String value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - if (options.isStreaming()) { - // In order to cancel the pipelines automatically, - // {@literal DataflowPipelineRunner} is forced to be used. - options.setRunner(DataflowPipelineRunner.class); - } - - options.setBigQuerySchema(FormatForBigquery.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); - - // We support running the same pipeline in either - // batch or windowed streaming mode. - PTransform<? super PBegin, PCollection<String>> readSource; - WindowFn<Object, ?> windowFn; - if (options.isStreaming()) { - Preconditions.checkArgument( - !options.getOutputToDatastore(), "DatastoreIO is not supported in streaming."); - dataflowUtils.setupPubsub(); - - readSource = PubsubIO.Read.topic(options.getPubsubTopic()); - windowFn = SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5)); - } else { - readSource = TextIO.Read.from(options.getInputFile()); - windowFn = new GlobalWindows(); - } - - // Create the pipeline. - Pipeline p = Pipeline.create(options); - PCollection<KV<String, List<CompletionCandidate>>> toWrite = p - .apply(readSource) - .apply(ParDo.of(new ExtractHashtags())) - .apply(Window.<String>into(windowFn)) - .apply(ComputeTopCompletions.top(10, options.getRecursive())); - - if (options.getOutputToDatastore()) { - toWrite - .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind()))) - .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull( - options.getOutputDataset(), options.getProject()))); - } - if (options.getOutputToBigQuery()) { - dataflowUtils.setupBigQueryTable(); - - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - - toWrite - .apply(ParDo.of(new FormatForBigquery())) - .apply(BigQueryIO.Write - .to(tableRef) - .withSchema(FormatForBigquery.getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); - } - - // Run the pipeline. - PipelineResult result = p.run(); - - if (options.isStreaming() && !options.getInputFile().isEmpty()) { - // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline. - dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic()); - } - - // dataflowUtils will try to cancel the pipeline and the injector before the program exists. - dataflowUtils.waitToFinish(result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md deleted file mode 100644 index 5fba154..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md +++ /dev/null @@ -1,44 +0,0 @@ - -# "Complete" Examples - -This directory contains end-to-end example pipelines that perform complex data processing tasks. They include: - -<ul> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java">AutoComplete</a> - — An example that computes the most popular hash tags for every - prefix, which can be used for auto-completion. Demonstrates how to use the - same pipeline in both streaming and batch, combiners, and composite - transforms.</li> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java">StreamingWordExtract</a> - — A streaming pipeline example that inputs lines of text from a Cloud - Pub/Sub topic, splits each line into individual words, capitalizes those - words, and writes the output to a BigQuery table. - </li> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java">TfIdf</a> - — An example that computes a basic TF-IDF search table for a directory or - Cloud Storage prefix. Demonstrates joining data, side inputs, and logging. - </li> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java">TopWikipediaSessions</a> - — An example that reads Wikipedia edit data from Cloud Storage and - computes the user with the longest string of edits separated by no more than - an hour within each month. Demonstrates using Cloud Dataflow - <code>Windowing</code> to perform time-based aggregations of data. - </li> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java">TrafficMaxLaneFlow</a> - — A streaming Cloud Dataflow example using BigQuery output in the - <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming - runner, sliding windows, Cloud Pub/Sub topic ingestion, the use of the - <code>AvroCoder</code> to encode a custom class, and custom - <code>Combine</code> transforms. - </li> - <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java">TrafficRoutes</a> - — A streaming Cloud Dataflow example using BigQuery output in the - <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming - runner, <code>GroupByKey</code>, keyed state, sliding windows, and Cloud - Pub/Sub topic ingestion. - </li> - </ul> - -See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples -README](../../../../../../../../../README.md) for -information about how to run these examples. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java deleted file mode 100644 index 99c5249..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; -import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; - -import java.io.IOException; -import java.util.ArrayList; - -/** - * A streaming Dataflow Example using BigQuery output. - * - * <p>This pipeline example reads lines of text from a PubSub topic, splits each line - * into individual words, capitalizes those words, and writes the output to - * a BigQuery table. - * - * <p>By default, the example will run a separate pipeline to inject the data from the default - * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for - * the streaming pipeline to process. You may override the default {@literal --inputFile} with the - * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will - * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input - * to this example. - * - * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table - * from the example common package (there are no defaults for a general Dataflow pipeline). - * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and - * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, - * the example will try to create them. - * - * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. - */ -public class StreamingWordExtract { - - /** A DoFn that tokenizes lines of text into individual words. */ - static class ExtractWords extends DoFn<String, String> { - @Override - public void processElement(ProcessContext c) { - String[] words = c.element().split("[^a-zA-Z']+"); - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A DoFn that uppercases a word. */ - static class Uppercase extends DoFn<String, String> { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().toUpperCase()); - } - } - - /** - * Converts strings into BigQuery rows. - */ - static class StringToRowConverter extends DoFn<String, TableRow> { - /** - * In this example, put the whole string into single BigQuery field. - */ - @Override - public void processElement(ProcessContext c) { - c.output(new TableRow().set("string_field", c.element())); - } - - static TableSchema getSchema() { - return new TableSchema().setFields(new ArrayList<TableFieldSchema>() { - // Compose the list of TableFieldSchema from tableSchema. - { - add(new TableFieldSchema().setName("string_field").setType("STRING")); - } - }); - } - } - - /** - * Options supported by {@link StreamingWordExtract}. - * - * <p>Inherits standard configuration options. - */ - private interface StreamingWordExtractOptions - extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - @Description("Input file to inject to Pub/Sub topic") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInputFile(); - void setInputFile(String value); - } - - /** - * Sets up and starts streaming pipeline. - * - * @throws IOException if there is a problem setting up resources - */ - public static void main(String[] args) throws IOException { - StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(StreamingWordExtractOptions.class); - options.setStreaming(true); - // In order to cancel the pipelines automatically, - // {@literal DataflowPipelineRunner} is forced to be used. - options.setRunner(DataflowPipelineRunner.class); - - options.setBigQuerySchema(StringToRowConverter.getSchema()); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); - dataflowUtils.setup(); - - Pipeline pipeline = Pipeline.create(options); - - String tableSpec = new StringBuilder() - .append(options.getProject()).append(":") - .append(options.getBigQueryDataset()).append(".") - .append(options.getBigQueryTable()) - .toString(); - pipeline - .apply(PubsubIO.Read.topic(options.getPubsubTopic())) - .apply(ParDo.of(new ExtractWords())) - .apply(ParDo.of(new Uppercase())) - .apply(ParDo.of(new StringToRowConverter())) - .apply(BigQueryIO.Write.to(tableSpec) - .withSchema(StringToRowConverter.getSchema())); - - PipelineResult result = pipeline.run(); - - if (!options.getInputFile().isEmpty()) { - // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline. - dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic()); - } - - // dataflowUtils will try to cancel the pipeline and the injector before the program exists. - dataflowUtils.waitToFinish(result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java deleted file mode 100644 index 65ac753..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.GcsOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.Keys; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.transforms.Values; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.WithKeys; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.util.GcsUtil; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PDone; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.TupleTag; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; - -/** - * An example that computes a basic TF-IDF search table for a directory or GCS prefix. - * - * <p>Concepts: joining data; side inputs; logging - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * }</pre> - * and a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * and an output prefix on GCS: - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with - * {@code --input}. - */ -public class TfIdf { - /** - * Options supported by {@link TfIdf}. - * - * <p>Inherits standard configuration options. - */ - private static interface Options extends PipelineOptions { - @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") - String getInput(); - void setInput(String value); - - @Description("Prefix of output URI to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - /** - * Lists documents contained beneath the {@code options.input} prefix/directory. - */ - public static Set<URI> listInputDocuments(Options options) - throws URISyntaxException, IOException { - URI baseUri = new URI(options.getInput()); - - // List all documents in the directory or GCS prefix. - URI absoluteUri; - if (baseUri.getScheme() != null) { - absoluteUri = baseUri; - } else { - absoluteUri = new URI( - "file", - baseUri.getAuthority(), - baseUri.getPath(), - baseUri.getQuery(), - baseUri.getFragment()); - } - - Set<URI> uris = new HashSet<>(); - if (absoluteUri.getScheme().equals("file")) { - File directory = new File(absoluteUri); - for (String entry : directory.list()) { - File path = new File(directory, entry); - uris.add(path.toURI()); - } - } else if (absoluteUri.getScheme().equals("gs")) { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - URI gcsUriGlob = new URI( - absoluteUri.getScheme(), - absoluteUri.getAuthority(), - absoluteUri.getPath() + "*", - absoluteUri.getQuery(), - absoluteUri.getFragment()); - for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { - uris.add(entry.toUri()); - } - } - - return uris; - } - - /** - * Reads the documents at the provided uris and returns all lines - * from the documents tagged with which document they are from. - */ - public static class ReadDocuments - extends PTransform<PInput, PCollection<KV<URI, String>>> { - private Iterable<URI> uris; - - public ReadDocuments(Iterable<URI> uris) { - this.uris = uris; - } - - @Override - public Coder<?> getDefaultOutputCoder() { - return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); - } - - @Override - public PCollection<KV<URI, String>> apply(PInput input) { - Pipeline pipeline = input.getPipeline(); - - // Create one TextIO.Read transform for each document - // and add its output to a PCollectionList - PCollectionList<KV<URI, String>> urisToLines = - PCollectionList.empty(pipeline); - - // TextIO.Read supports: - // - file: URIs and paths locally - // - gs: URIs on the service - for (final URI uri : uris) { - String uriString; - if (uri.getScheme().equals("file")) { - uriString = new File(uri).getPath(); - } else { - uriString = uri.toString(); - } - - PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) - .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); - - urisToLines = urisToLines.and(oneUriToLines); - } - - return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); - } - } - - /** - * A transform containing a basic TF-IDF pipeline. The input consists of KV objects - * where the key is the document's URI and the value is a piece - * of the document's content. The output is mapping from terms to - * scores for each document URI. - */ - public static class ComputeTfIdf - extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { - public ComputeTfIdf() { } - - @Override - public PCollection<KV<String, KV<URI, Double>>> apply( - PCollection<KV<URI, String>> uriToContent) { - - // Compute the total number of documents, and - // prepare this singleton PCollectionView for - // use as a side input. - final PCollectionView<Long> totalDocuments = - uriToContent - .apply("GetURIs", Keys.<URI>create()) - .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) - .apply(Count.<URI>globally()) - .apply(View.<Long>asSingleton()); - - // Create a collection of pairs mapping a URI to each - // of the words in the document associated with that that URI. - PCollection<KV<URI, String>> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( - new DoFn<KV<URI, String>, KV<URI, String>>() { - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word âloveâ is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } - } - })); - - // Compute a mapping from each word to the total - // number of documents in which it appears. - PCollection<KV<String, Long>> wordToDocCount = uriToWords - .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) - .apply(Values.<String>create()) - .apply("CountDocs", Count.<String>perElement()); - - // Compute a mapping from each URI to the total - // number of words in the document associated with that URI. - PCollection<KV<URI, Long>> uriToWordTotal = uriToWords - .apply("GetURIs2", Keys.<URI>create()) - .apply("CountWords", Count.<URI>perElement()); - - // Count, for each (URI, word) pair, the number of - // occurrences of that word in the document associated - // with the URI. - PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords - .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); - - // Adjust the above collection to a mapping from - // (URI, word) pairs to counts into an isomorphic mapping - // from URI to (word, count) pairs, to prepare for a join - // by the URI key. - PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); - } - })); - - // Prepare to join the mapping of URI to (word, count) pairs with - // the mapping of URI to total word counts, by associating - // each of the input PCollection<KV<URI, ...>> with - // a tuple tag. Each input must have the same key type, URI - // in this case. The type parameter of the tuple tag matches - // the types of the values for each collection. - final TupleTag<Long> wordTotalsTag = new TupleTag<Long>(); - final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>(); - KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple - .of(wordTotalsTag, uriToWordTotal) - .and(wordCountsTag, uriToWordAndCount); - - // Perform a CoGroupByKey (a sort of pre-join) on the prepared - // inputs. This yields a mapping from URI to a CoGbkResult - // (CoGroupByKey Result). The CoGbkResult is a mapping - // from the above tuple tags to the values in each input - // associated with a particular URI. In this case, each - // KV<URI, CoGbkResult> group a URI with the total number of - // words in that document as well as all the (word, count) - // pairs for particular words. - PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput - .apply("CoGroupByUri", CoGroupByKey.<URI>create()); - - // Compute a mapping from each word to a (URI, term frequency) - // pair for each URI. A word's term frequency for a document - // is simply the number of times that word occurs in the document - // divided by the total number of words in the document. - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); - - for (KV<String, Long> wordAndCount - : c.element().getValue().getAll(wordCountsTag)) { - String word = wordAndCount.getKey(); - Long wordCount = wordAndCount.getValue(); - Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); - } - } - })); - - // Compute a mapping from each word to its document frequency. - // A word's document frequency in a corpus is the number of - // documents in which the word appears divided by the total - // number of documents in the corpus. Note how the total number of - // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. - PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") - .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); - Double documentFrequency = documentCount.doubleValue() - / documentTotal.doubleValue(); - - c.output(KV.of(word, documentFrequency)); - } - })); - - // Join the term frequency and document frequency - // collections, each keyed on the word. - final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>(); - final TupleTag<Double> dfTag = new TupleTag<Double>(); - PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple - .of(tfTag, wordToUriAndTf) - .and(dfTag, wordToDf) - .apply(CoGroupByKey.<String>create()); - - // Compute a mapping from each word to a (URI, TF-IDF) score - // for each URI. There are a variety of definitions of TF-IDF - // ("term frequency - inverse document frequency") score; - // here we use a basic version that is the term frequency - // divided by the log of the document frequency. - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); - - for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { - URI uri = uriAndTf.getKey(); - Double tf = uriAndTf.getValue(); - Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); - } - } - })); - - return wordToUriAndTfIdf; - } - - // Instantiate Logger. - // It is suggested that the user specify the class name of the containing class - // (in this case ComputeTfIdf). - private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); - } - - /** - * A {@link PTransform} to write, in CSV format, a mapping from term and URI - * to score. - */ - public static class WriteTfIdf - extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { - private String output; - - public WriteTfIdf(String output) { - this.output = output; - } - - @Override - public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { - return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { - @Override - public void processElement(ProcessContext c) { - c.output(String.format("%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); - } - })) - .apply(TextIO.Write - .to(output) - .withSuffix(".csv")); - } - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline pipeline = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline - .apply(new ReadDocuments(listInputDocuments(options))) - .apply(new ComputeTfIdf()) - .apply(new WriteTfIdf(options.getOutput())); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java deleted file mode 100644 index c57a5f2..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableComparator; -import com.google.cloud.dataflow.sdk.transforms.Top; -import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.util.List; - -/** - * An example that reads Wikipedia edit data from Cloud Storage and computes the user with - * the longest string of edits separated by no more than an hour within each month. - * - * <p>Concepts: Using Windowing to perform time-based aggregations of data. - * - * <p>It is not recommended to execute this pipeline locally, given the size of the default input - * data. - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * } - * </pre> - * and an output prefix on GCS: - * <pre>{@code - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be - * overridden with {@code --input}. - * - * <p>The input for this example is large enough that it's a good place to enable (experimental) - * autoscaling: - * <pre>{@code - * --autoscalingAlgorithm=BASIC - * --maxNumWorkers=20 - * } - * </pre> - * This will automatically scale the number of workers up over time until the job completes. - */ -public class TopWikipediaSessions { - private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json"; - - /** - * Extracts user and timestamp from a TableRow representing a Wikipedia edit. - */ - static class ExtractUserAndTimestamp extends DoFn<TableRow, String> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - int timestamp = (Integer) row.get("timestamp"); - String userName = (String) row.get("contributor_username"); - if (userName != null) { - // Sets the implicit timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); - } - } - } - - /** - * Computes the number of edits in each user session. A session is defined as - * a string of edits where each is separated from the next by less than an hour. - */ - static class ComputeSessions - extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> apply(PCollection<String> actions) { - return actions - .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1)))) - - .apply(Count.<String>perElement()); - } - } - - /** - * Computes the longest session ending in each month. - */ - private static class TopPerMonth - extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> { - @Override - public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) { - return sessions - .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1))) - - .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() { - @Override - public int compare(KV<String, Long> o1, KV<String, Long> o2) { - return Long.compare(o1.getValue(), o2.getValue()); - } - }).withoutDefaults()); - } - } - - static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> - implements RequiresWindowAccess { - - @Override - public void processElement(ProcessContext c) { - c.output(KV.of( - c.element().getKey() + " : " + c.window(), c.element().getValue())); - } - } - - static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> - implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) { - for (KV<String, Long> item : c.element()) { - String session = item.getKey(); - long count = item.getValue(); - c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start()); - } - } - } - - static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> { - - private final double samplingThreshold; - - public ComputeTopSessions(double samplingThreshold) { - this.samplingThreshold = samplingThreshold; - } - - @Override - public PCollection<String> apply(PCollection<TableRow> input) { - return input - .apply(ParDo.of(new ExtractUserAndTimestamp())) - - .apply(ParDo.named("SampleUsers").of( - new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) { - if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) { - c.output(c.element()); - } - } - })) - - .apply(new ComputeSessions()) - - .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn())) - .apply(new TopPerMonth()) - .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn())); - } - } - - /** - * Options supported by this class. - * - * <p>Inherits standard Dataflow configuration options. - */ - private static interface Options extends PipelineOptions { - @Description( - "Input specified as a GCS path containing a BigQuery table exported as json") - @Default.String(EXPORTED_WIKI_TABLE) - String getInput(); - void setInput(String value); - - @Description("File to output results to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - Options options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(Options.class); - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - - Pipeline p = Pipeline.create(dataflowOptions); - - double samplingThreshold = 0.1; - - p.apply(TextIO.Read - .from(options.getInput()) - .withCoder(TableRowJsonCoder.of())) - .apply(new ComputeTopSessions(samplingThreshold)) - .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput())); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java deleted file mode 100644 index 2d54252..0000000 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; -import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Strings; - -import org.apache.avro.reflect.Nullable; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data. - * You can configure the running mode by setting {@literal --streaming} to true or false. - * - * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub - * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms. - * - * <p>This example analyzes traffic sensor data using SlidingWindows. For each window, - * it finds the lane that had the highest flow recorded, for each sensor station. It writes - * those max values along with auxiliary info to a BigQuery table. - * - * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}. - * - * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic. - * By default, the example will run a separate pipeline to inject the data from the default - * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for - * the streaming pipeline to process. You may override the default {@literal --inputFile} with the - * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will - * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input - * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic, - * is provided in - * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>. - * - * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table - * from the example common package (there are no defaults for a general Dataflow pipeline). - * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and - * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, - * the example will try to create them. - * - * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. - */ -public class TrafficMaxLaneFlow { - - private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; - private static final Integer VALID_INPUTS = 4999; - - static final int WINDOW_DURATION = 60; // Default sliding window duration in minutes - static final int WINDOW_SLIDE_EVERY = 5; // Default window 'slide every' setting in minutes - - /** - * This class holds information about each lane in a station reading, along with some general - * information from the reading. - */ - @DefaultCoder(AvroCoder.class) - static class LaneInfo { - @Nullable String stationId; - @Nullable String lane; - @Nullable String direction; - @Nullable String freeway; - @Nullable String recordedTimestamp; - @Nullable Integer laneFlow; - @Nullable Integer totalFlow; - @Nullable Double laneAO; - @Nullable Double laneAS; - - public LaneInfo() {} - - public LaneInfo(String stationId, String lane, String direction, String freeway, - String timestamp, Integer laneFlow, Double laneAO, - Double laneAS, Integer totalFlow) { - this.stationId = stationId; - this.lane = lane; - this.direction = direction; - this.freeway = freeway; - this.recordedTimestamp = timestamp; - this.laneFlow = laneFlow; - this.laneAO = laneAO; - this.laneAS = laneAS; - this.totalFlow = totalFlow; - } - - public String getStationId() { - return this.stationId; - } - public String getLane() { - return this.lane; - } - public String getDirection() { - return this.direction; - } - public String getFreeway() { - return this.freeway; - } - public String getRecordedTimestamp() { - return this.recordedTimestamp; - } - public Integer getLaneFlow() { - return this.laneFlow; - } - public Double getLaneAO() { - return this.laneAO; - } - public Double getLaneAS() { - return this.laneAS; - } - public Integer getTotalFlow() { - return this.totalFlow; - } - } - - /** - * Extract the timestamp field from the input string, and use it as the element timestamp. - */ - static class ExtractTimestamps extends DoFn<String, String> { - private static final DateTimeFormatter dateTimeFormat = - DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"); - - @Override - public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { - String[] items = c.element().split(","); - if (items.length > 0) { - try { - String timestamp = items[0]; - c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp))); - } catch (IllegalArgumentException e) { - // Skip the invalid input. - } - } - } - } - - /** - * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples. - * This will let us determine which lane has the max flow for that station over the span of the - * window, and output not only the max flow from that calculation, but other associated - * information. The number of lanes for which data is present depends upon which freeway the data - * point comes from. - */ - static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> { - - @Override - public void processElement(ProcessContext c) { - String[] items = c.element().split(","); - if (items.length < 48) { - // Skip the invalid input. - return; - } - // extract the sensor information for the lanes from the input string fields. - String timestamp = items[0]; - String stationId = items[1]; - String freeway = items[2]; - String direction = items[3]; - Integer totalFlow = tryIntParse(items[7]); - for (int i = 1; i <= 8; ++i) { - Integer laneFlow = tryIntParse(items[6 + 5 * i]); - Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]); - Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]); - if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) { - return; - } - LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp, - laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow); - c.output(KV.of(stationId, laneInfo)); - } - } - } - - /** - * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane - * flow over all the data points in the Window. Extracts the lane flow from the input string and - * determines whether it's the max seen so far. We're using a custom combiner instead of the Max - * transform because we want to retain the additional information we've associated with the flow - * value. - */ - public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> { - @Override - public LaneInfo apply(Iterable<LaneInfo> input) { - Integer max = 0; - LaneInfo maxInfo = new LaneInfo(); - for (LaneInfo item : input) { - Integer flow = item.getLaneFlow(); - if (flow != null && (flow >= max)) { - max = flow; - maxInfo = item; - } - } - return maxInfo; - } - } - - /** - * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery. - * Add the timestamp from the window context. - */ - static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> { - @Override - public void processElement(ProcessContext c) { - - LaneInfo laneInfo = c.element().getValue(); - TableRow row = new TableRow() - .set("station_id", c.element().getKey()) - .set("direction", laneInfo.getDirection()) - .set("freeway", laneInfo.getFreeway()) - .set("lane_max_flow", laneInfo.getLaneFlow()) - .set("lane", laneInfo.getLane()) - .set("avg_occ", laneInfo.getLaneAO()) - .set("avg_speed", laneInfo.getLaneAS()) - .set("total_flow", laneInfo.getTotalFlow()) - .set("recorded_timestamp", laneInfo.getRecordedTimestamp()) - .set("window_timestamp", c.timestamp().toString()); - c.output(row); - } - - /** Defines the BigQuery schema used for the output. */ - static TableSchema getSchema() { - List<TableFieldSchema> fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("station_id").setType("STRING")); - fields.add(new TableFieldSchema().setName("direction").setType("STRING")); - fields.add(new TableFieldSchema().setName("freeway").setType("STRING")); - fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("lane").setType("STRING")); - fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT")); - fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT")); - fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - } - - /** - * This PTransform extracts lane info, calculates the max lane flow found for a given station (for - * the current Window) using a custom 'combiner', and formats the results for BigQuery. - */ - static class MaxLaneFlow - extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> { - @Override - public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) { - // stationId, LaneInfo => stationId + max lane flow info - PCollection<KV<String, LaneInfo>> flowMaxes = - flowInfo.apply(Combine.<String, LaneInfo>perKey( - new MaxFlow())); - - // <stationId, max lane flow info>... => row... - PCollection<TableRow> results = flowMaxes.apply( - ParDo.of(new FormatMaxesFn())); - - return results; - } - } - - static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> { - private final String inputFile; - - public ReadFileAndExtractTimestamps(String inputFile) { - this.inputFile = inputFile; - } - - @Override - public PCollection<String> apply(PBegin begin) { - return begin - .apply(TextIO.Read.from(inputFile)) - .apply(ParDo.of(new ExtractTimestamps())); - } - } - - /** - * Options supported by {@link TrafficMaxLaneFlow}. - * - * <p>Inherits standard configuration options. - */ - private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions, - ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions { - @Description("Input file to inject to Pub/Sub topic") - @Default.String("gs://dataflow-samples/traffic_sensor/" - + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") - String getInputFile(); - void setInputFile(String value); - - @Description("Numeric value of sliding window duration, in minutes") - @Default.Integer(WINDOW_DURATION) - Integer getWindowDuration(); - void setWindowDuration(Integer value); - - @Description("Numeric value of window 'slide every' setting, in minutes") - @Default.Integer(WINDOW_SLIDE_EVERY) - Integer getWindowSlideEvery(); - void setWindowSlideEvery(Integer value); - - @Description("Whether to run the pipeline with unbounded input") - @Default.Boolean(false) - boolean isUnbounded(); - void setUnbounded(boolean value); - } - - /** - * Sets up and starts streaming pipeline. - * - * @throws IOException if there is a problem setting up resources - */ - public static void main(String[] args) throws IOException { - TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(TrafficMaxLaneFlowOptions.class); - options.setBigQuerySchema(FormatMaxesFn.getSchema()); - // Using DataflowExampleUtils to set up required resources. - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded()); - - Pipeline pipeline = Pipeline.create(options); - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - - PCollection<String> input; - if (options.isUnbounded()) { - // Read unbounded PubSubIO. - input = pipeline.apply(PubsubIO.Read - .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY) - .subscription(options.getPubsubSubscription())); - } else { - // Read bounded PubSubIO. - input = pipeline.apply(PubsubIO.Read - .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY) - .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS)); - - // To read bounded TextIO files, use: - // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile())); - } - input - // row... => <station route, station speed> ... - .apply(ParDo.of(new ExtractFlowInfoFn())) - // map the incoming data stream into sliding windows. The default window duration values - // work well if you're running the accompanying Pub/Sub generator script with the - // --replay flag, which simulates pauses in the sensor data publication. You may want to - // adjust them otherwise. - .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of( - Duration.standardMinutes(options.getWindowDuration())). - every(Duration.standardMinutes(options.getWindowSlideEvery())))) - .apply(new MaxLaneFlow()) - .apply(BigQueryIO.Write.to(tableRef) - .withSchema(FormatMaxesFn.getSchema())); - - // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline. - if (!Strings.isNullOrEmpty(options.getInputFile()) - && !Strings.isNullOrEmpty(options.getPubsubTopic())) { - dataflowUtils.runInjectorPipeline( - new ReadFileAndExtractTimestamps(options.getInputFile()), - options.getPubsubTopic(), - PUBSUB_TIMESTAMP_LABEL_KEY); - } - - // Run the pipeline. - PipelineResult result = pipeline.run(); - - // dataflowUtils will try to cancel the pipeline and the injector before the program exists. - dataflowUtils.waitToFinish(result); - } - - private static Integer tryIntParse(String number) { - try { - return Integer.parseInt(number); - } catch (NumberFormatException e) { - return null; - } - } - - private static Double tryDoubleParse(String number) { - try { - return Double.parseDouble(number); - } catch (NumberFormatException e) { - return null; - } - } -}
