Repository: beam Updated Branches: refs/heads/master c54670fc2 -> a4f7a9c3f
Use text output for first two mobile gaming examples Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/758fee8e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/758fee8e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/758fee8e Branch: refs/heads/master Commit: 758fee8e95bbbda985988b6ea92f6c7a741bf74d Parents: c54670f Author: Ahmet Altay <[email protected]> Authored: Tue May 9 16:45:24 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed May 10 11:12:20 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/game/HourlyTeamScore.java | 57 ++---- .../examples/complete/game/LeaderBoard.java | 26 +++ .../beam/examples/complete/game/UserScore.java | 56 +++--- .../complete/game/utils/WriteToText.java | 184 +++++++++++++++++++ 4 files changed, 251 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index 2928882..6a322da 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -20,9 +20,8 @@ package org.apache.beam.examples.complete.game; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; -import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; +import org.apache.beam.examples.complete.game.utils.WriteToText; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -58,12 +57,11 @@ import org.joda.time.format.DateTimeFormatter; * like this: * <pre>{@code * --project=YOUR_PROJECT_ID - * --tempLocation=gs://YOUR_TEMP_DIRECTORY - * --runner=BlockingDataflowRunner - * --dataset=YOUR-DATASET + * --tempLocation=YOUR_TEMP_DIRECTORY + * --runner=YOUR_RUNNER + * --output=YOUR_OUTPUT_DIRECTORY * } * </pre> - * where the BigQuery dataset you specify must already exist. * * <p>Optionally include {@code --input} to specify the batch input file path. * To indicate a time after which the data should be filtered out, include the @@ -107,39 +105,26 @@ public class HourlyTeamScore extends UserScore { @Default.String("2100-01-01-00-00") String getStopMin(); void setStopMin(String value); - - @Description("The BigQuery table name. Should not already exist.") - @Default.String("hourly_team_score") - String getHourlyTeamScoreTableName(); - void setHourlyTeamScoreTableName(String value); } /** - * Create a map of information that describes how to write pipeline output to BigQuery. This map - * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and + * Create a map of information that describes how to write pipeline output to text. This map + * is passed to the {@link WriteToText} constructor to write team score sums and * includes information about window start time. */ - protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> - configureWindowedTableWrite() { - Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig = - new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>(); - tableConfig.put( - "team", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", (c, w) -> c.element().getKey())); - tableConfig.put( - "total_score", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>( - "INTEGER", (c, w) -> c.element().getValue())); - tableConfig.put( + protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> + configureOutput() { + Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = + new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>(); + config.put("team", (c, w) -> c.element().getKey()); + config.put("total_score", (c, w) -> c.element().getValue()); + config.put( "window_start", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", - (c, w) -> { + (c, w) -> { IntervalWindow window = (IntervalWindow) w; return fmt.print(window.start()); - })); - return tableConfig; + }); + return config; } @@ -186,12 +171,10 @@ public class HourlyTeamScore extends UserScore { // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) .apply("WriteTeamScoreSums", - new WriteWindowedToBigQuery<KV<String, Integer>>( - options.as(GcpOptions.class).getProject(), - options.getDataset(), - options.getHourlyTeamScoreTableName(), - configureWindowedTableWrite())); - + new WriteToText<KV<String, Integer>>( + options.getOutput(), + configureOutput(), + true)); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index bfad9f6..f673a8d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -106,6 +106,11 @@ public class LeaderBoard extends HourlyTeamScore { */ interface Options extends HourlyTeamScore.Options, ExampleOptions, StreamingOptions { + @Description("BigQuery Dataset to write tables to. Must already exist.") + @Validation.Required + String getDataset(); + void setDataset(String value); + @Description("Pub/Sub topic to read from") @Validation.Required String getTopic(); @@ -163,6 +168,27 @@ public class LeaderBoard extends HourlyTeamScore { return tableConfigure; } + + /** + * Create a map of information that describes how to write pipeline output to BigQuery. This map + * is passed to the {@link WriteToBigQuery} constructor to write user score sums. + */ + protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> + configureBigQueryWrite() { + Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = + new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>(); + tableConfigure.put( + "user", + new WriteToBigQuery.FieldInfo<KV<String, Integer>>( + "STRING", (c, w) -> c.element().getKey())); + tableConfigure.put( + "total_score", + new WriteToBigQuery.FieldInfo<KV<String, Integer>>( + "INTEGER", (c, w) -> c.element().getValue())); + return tableConfigure; + } + + /** * Create a map of information that describes how to write pipeline output to BigQuery. This map * is used to write user score sums. http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 8110146..7297bcd 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -20,11 +20,10 @@ package org.apache.beam.examples.complete.game; import java.util.HashMap; import java.util.Map; import org.apache.avro.reflect.Nullable; -import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; +import org.apache.beam.examples.complete.game.utils.WriteToText; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -62,9 +61,9 @@ import org.slf4j.LoggerFactory; * the pipeline configuration like this: * <pre>{@code * --project=YOUR_PROJECT_ID - * --tempLocation=gs://YOUR_TEMP_DIRECTORY - * --runner=BlockingDataflowRunner - * --dataset=YOUR-DATASET + * --tempLocation=YOUR_TEMP_DIRECTORY + * --runner=YOUR_RUNNER + * --output=YOUR_OUTPUT_DIRECTORY * } * </pre> * where the BigQuery dataset you specify must already exist. @@ -186,37 +185,26 @@ public class UserScore { String getInput(); void setInput(String value); - @Description("BigQuery Dataset to write tables to. Must already exist.") + // Set this required option to specify where to write the output. + @Description("Path of the file to write to.") @Validation.Required - String getDataset(); - void setDataset(String value); - - @Description("The BigQuery table name. Should not already exist.") - @Default.String("user_score") - String getUserScoreTableName(); - void setUserScoreTableName(String value); + String getOutput(); + void setOutput(String value); } /** - * Create a map of information that describes how to write pipeline output to BigQuery. This map - * is passed to the {@link WriteToBigQuery} constructor to write user score sums. + * Create a map of information that describes how to write pipeline output to text. This map + * is passed to the {@link WriteToText} constructor to write user score sums. */ - protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> - configureBigQueryWrite() { - Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = - new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>(); - tableConfigure.put( - "user", - new WriteToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", (c, w) -> c.element().getKey())); - tableConfigure.put( - "total_score", - new WriteToBigQuery.FieldInfo<KV<String, Integer>>( - "INTEGER", (c, w) -> c.element().getValue())); - return tableConfigure; + protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> + configureOutput() { + Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = + new HashMap<String, WriteToText.FieldFn<KV<String, Integer>>>(); + config.put("user", (c, w) -> c.element().getKey()); + config.put("total_score", (c, w) -> c.element().getValue()); + return config; } - /** * Run a batch pipeline. */ @@ -234,15 +222,13 @@ public class UserScore { .apply("ExtractUserScore", new ExtractAndSumScore("user")) .apply( "WriteUserScoreSums", - new WriteToBigQuery<KV<String, Integer>>( - options.as(GcpOptions.class).getProject(), - options.getDataset(), - options.getUserScoreTableName(), - configureBigQueryWrite())); + new WriteToText<KV<String, Integer>>( + options.getOutput(), + configureOutput(), + false)); // Run the batch pipeline. pipeline.run().waitUntilFinish(); } // [END DocInclude_USMain] - } http://git-wip-us.apache.org/repos/asf/beam/blob/758fee8e/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java new file mode 100644 index 0000000..e6c8ddb --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.game.utils; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verifyNotNull; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * Generate, format, and write rows. Use provided information about the field names and types, as + * well as lambda functions that describe how to generate their values. + */ +public class WriteToText<InputT> + extends PTransform<PCollection<InputT>, PDone> { + + private static final DateTimeFormatter formatter = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); + + protected String filenamePrefix; + protected Map<String, FieldFn<InputT>> fieldFn; + protected boolean windowed; + + public WriteToText() { + } + + public WriteToText( + String filenamePrefix, + Map<String, FieldFn<InputT>> fieldFn, + boolean windowed) { + this.filenamePrefix = filenamePrefix; + this.fieldFn = fieldFn; + this.windowed = windowed; + } + + /** + * A {@link Serializable} function from a {@link DoFn.ProcessContext} + * and {@link BoundedWindow} to the value for that field. + */ + public interface FieldFn<InputT> extends Serializable { + Object apply(DoFn<InputT, String>.ProcessContext context, BoundedWindow window); + } + + /** Convert each key/score pair into a row as specified by fieldFn. */ + protected class BuildRowFn extends DoFn<InputT, String> { + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + List<String> fields = new ArrayList<String>(); + for (Map.Entry<String, FieldFn<InputT>> entry : fieldFn.entrySet()) { + String key = entry.getKey(); + FieldFn<InputT> fcn = entry.getValue(); + fields.add(key + ": " + fcn.apply(c, window)); + } + String result = fields.stream().collect(Collectors.joining(", ")); + c.output(result); + } + } + + /** + * A {@link DoFn} that writes elements to files with names deterministically derived from the + * lower and upper bounds of their key (an {@link IntervalWindow}). + */ + protected class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> { + + private final String filenamePrefix; + + public WriteOneFilePerWindow(String filenamePrefix) { + this.filenamePrefix = filenamePrefix; + } + + @Override + public PDone expand(PCollection<String> input) { + // Verify that the input has a compatible window type. + checkArgument( + input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder()); + + // filenamePrefix may contain a directory and a filename component. Pull out only the filename + // component from that path for the PerWindowFiles. + String prefix = ""; + ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix); + if (!resource.isDirectory()) { + prefix = verifyNotNull( + resource.getFilename(), + "A non-directory resource should have a non-null filename: %s", + resource); + } + + return input.apply( + TextIO.write() + .to(resource.getCurrentDirectory()) + .withFilenamePolicy(new PerWindowFiles(prefix)) + .withWindowedWrites() + .withNumShards(3)); + } + } + + /** + * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data + * being written. This always includes the shard number and the total number of shards. For + * windowed writes, it also includes the window and pane index (a sequence number assigned to each + * trigger firing). + */ + protected static class PerWindowFiles extends FilenamePolicy { + + private final String prefix; + + public PerWindowFiles(String prefix) { + this.prefix = prefix; + } + + public String filenamePrefixForWindow(IntervalWindow window) { + return String.format("%s-%s-%s", + prefix, formatter.print(window.start()), formatter.print(window.end())); + } + + @Override + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext context, String extension) { + IntervalWindow window = (IntervalWindow) context.getWindow(); + String filename = String.format( + "%s-%s-of-%s%s", + filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), + extension); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); + } + + @Override + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context context, String extension) { + throw new UnsupportedOperationException("Unsupported."); + } + } + + @Override + public PDone expand(PCollection<InputT> teamAndScore) { + if (windowed) { + teamAndScore + .apply("ConvertToRow", ParDo.of(new BuildRowFn())) + .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix)); + } else { + teamAndScore + .apply("ConvertToRow", ParDo.of(new BuildRowFn())) + .apply(TextIO.write().to(filenamePrefix)); + } + return PDone.in(teamAndScore.getPipeline()); + } +}
