http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java deleted file mode 100644 index 4185376..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game; - -import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery; -import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime; -import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.DateTimeZone; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.util.HashMap; -import java.util.Map; -import java.util.TimeZone; - -/** - * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain, - * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded - * data using fixed windows; use of custom timestamps and event-time processing; generation of - * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late- - * arriving data. - * - * <p> This pipeline processes an unbounded stream of 'game events'. The calculation of the team - * scores uses fixed windowing based on event time (the time of the game play event), not - * processing time (the time that an event is processed by the pipeline). The pipeline calculates - * the sum of scores per team, for each window. By default, the team scores are calculated using - * one-hour windows. - * - * <p> In contrast-- to demo another windowing option-- the user scores are calculated using a - * global window, which periodically (every ten minutes) emits cumulative user score sums. - * - * <p> In contrast to the previous pipelines in the series, which used static, finite input data, - * here we're using an unbounded data source, which lets us provide speculative results, and allows - * handling of late data, at much lower latency. We can use the early/speculative results to keep a - * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct - * results, e.g. for 'team prizes'. We're now outputing window results as they're - * calculated, giving us much lower latency than with the previous batch examples. - * - * <p> Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector - * documentation provides more detail on how to do this. - * - * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration - * like this: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * --dataset=YOUR-DATASET - * --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC - * } - * </pre> - * where the BigQuery dataset you specify must already exist. - * The PubSub topic you specify should be the same topic to which the Injector is publishing. - */ -public class LeaderBoard extends HourlyTeamScore { - - private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; - - private static DateTimeFormatter fmt = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); - static final Duration FIVE_MINUTES = Duration.standardMinutes(5); - static final Duration TEN_MINUTES = Duration.standardMinutes(10); - - - /** - * Options supported by {@link LeaderBoard}. - */ - static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions { - - @Description("Pub/Sub topic to read from") - @Validation.Required - String getTopic(); - void setTopic(String value); - - @Description("Numeric value of fixed window duration for team analysis, in minutes") - @Default.Integer(60) - Integer getTeamWindowDuration(); - void setTeamWindowDuration(Integer value); - - @Description("Numeric value of allowed data lateness, in minutes") - @Default.Integer(120) - Integer getAllowedLateness(); - void setAllowedLateness(Integer value); - - @Description("Prefix used for the BigQuery table names") - @Default.String("leaderboard") - String getTableName(); - void setTableName(String value); - } - - /** - * Create a map of information that describes how to write pipeline output to BigQuery. This map - * is used to write team score sums and includes event timing information. - */ - protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> - configureWindowedTableWrite() { - - Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = - new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>(); - tableConfigure.put("team", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING", - c -> c.element().getKey())); - tableConfigure.put("total_score", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", - c -> c.element().getValue())); - tableConfigure.put("window_start", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING", - c -> { IntervalWindow w = (IntervalWindow) c.window(); - return fmt.print(w.start()); })); - tableConfigure.put("processing_time", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", c -> fmt.print(Instant.now()))); - tableConfigure.put("timing", - new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", c -> c.pane().getTiming().toString())); - return tableConfigure; - } - - /** - * Create a map of information that describes how to write pipeline output to BigQuery. This map - * is used to write user score sums. - */ - protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> - configureGlobalWindowBigQueryWrite() { - - Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = - configureBigQueryWrite(); - tableConfigure.put("processing_time", - new WriteToBigQuery.FieldInfo<KV<String, Integer>>( - "STRING", c -> fmt.print(Instant.now()))); - return tableConfigure; - } - - - public static void main(String[] args) throws Exception { - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - // Enforce that this pipeline is always run in streaming mode. - options.setStreaming(true); - // For example purposes, allow the pipeline to be easily cancelled instead of running - // continuously. - options.setRunner(DataflowPipelineRunner.class); - DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); - Pipeline pipeline = Pipeline.create(options); - - // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub - // data elements, and parse the data. - PCollection<GameActionInfo> gameEvents = pipeline - .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); - - // [START DocInclude_WindowAndTrigger] - // Extract team/score pairs from the event stream, using hour-long windows by default. - gameEvents - .apply(Window.named("LeaderboardTeamFixedWindows") - .<GameActionInfo>into(FixedWindows.of( - Duration.standardMinutes(options.getTeamWindowDuration()))) - // We will get early (speculative) results as well as cumulative - // processing of late data. - .triggering( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(FIVE_MINUTES)) - .withLateFirings(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(TEN_MINUTES))) - .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())) - .accumulatingFiredPanes()) - // Extract and sum teamname/score pairs from the event data. - .apply("ExtractTeamScore", new ExtractAndSumScore("team")) - // Write the results to BigQuery. - .apply("WriteTeamScoreSums", - new WriteWindowedToBigQuery<KV<String, Integer>>( - options.getTableName() + "_team", configureWindowedTableWrite())); - // [END DocInclude_WindowAndTrigger] - - // [START DocInclude_ProcTimeTrigger] - // Extract user/score pairs from the event stream using processing time, via global windowing. - // Get periodic updates on all users' running scores. - gameEvents - .apply(Window.named("LeaderboardUserGlobalWindow") - .<GameActionInfo>into(new GlobalWindows()) - // Get periodic results every ten minutes. - .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(TEN_MINUTES))) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))) - // Extract and sum username/score pairs from the event data. - .apply("ExtractUserScore", new ExtractAndSumScore("user")) - // Write the results to BigQuery. - .apply("WriteUserScoreSums", - new WriteToBigQuery<KV<String, Integer>>( - options.getTableName() + "_user", configureGlobalWindowBigQueryWrite())); - // [END DocInclude_ProcTimeTrigger] - - // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the - // command line. - PipelineResult result = pipeline.run(); - dataflowUtils.waitToFinish(result); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md deleted file mode 100644 index 79b55ce..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md +++ /dev/null @@ -1,113 +0,0 @@ - -# 'Gaming' examples - - -This directory holds a series of example Dataflow pipelines in a simple 'mobile -gaming' domain. They all require Java 8. Each pipeline successively introduces -new concepts, and gives some examples of using Java 8 syntax in constructing -Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts -that are used apply equally well in Java 7. - -In the gaming scenario, many users play, as members of different teams, over -the course of a day, and their actions are logged for processing. Some of the -logged game events may be late-arriving, if users play on mobile devices and go -transiently offline for a period. - -The scenario includes not only "regular" users, but "robot users", which have a -higher click rate than the regular users, and may move from team to team. - -The first two pipelines in the series use pre-generated batch data samples. The -second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/) -topic input. For these examples, you will also need to run the -`injector.Injector` program, which generates and publishes the gaming data to -PubSub. The javadocs for each pipeline have more detailed information on how to -run that pipeline. - -All of these pipelines write their results to BigQuery table(s). - - -## The pipelines in the 'gaming' series - -### UserScore - -The first pipeline in the series is `UserScore`. This pipeline does batch -processing of data collected from gaming events. It calculates the sum of -scores per user, over an entire batch of gaming data (collected, say, for each -day). The batch processing will not include any late data that arrives after -the day's cutoff point. - -### HourlyTeamScore - -The next pipeline in the series is `HourlyTeamScore`. This pipeline also -processes data collected from gaming events in batch. It builds on `UserScore`, -but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by -default an hour in duration. It calculates the sum of scores per team, for each -window, optionally allowing specification of two timestamps before and after -which data is filtered out. This allows a model where late data collected after -the intended analysis window can be included in the analysis, and any late- -arriving data prior to the beginning of the analysis window can be removed as -well. - -By using windowing and adding element timestamps, we can do finer-grained -analysis than with the `UserScore` pipeline â we're now tracking scores for -each hour rather than over the course of a whole day. However, our batch -processing is high-latency, in that we don't get results from plays at the -beginning of the batch's time period until the complete batch is processed. - -### LeaderBoard - -The third pipeline in the series is `LeaderBoard`. This pipeline processes an -unbounded stream of 'game events' from a PubSub topic. The calculation of the -team scores uses fixed windowing based on event time (the time of the game play -event), not processing time (the time that an event is processed by the -pipeline). The pipeline calculates the sum of scores per team, for each window. -By default, the team scores are calculated using one-hour windows. - -In contrast â to demo another windowing option â the user scores are calculated -using a global window, which periodically (every ten minutes) emits cumulative -user score sums. - -In contrast to the previous pipelines in the series, which used static, finite -input data, here we're using an unbounded data source, which lets us provide -_speculative_ results, and allows handling of late data, at much lower latency. -E.g., we could use the early/speculative results to keep a 'leaderboard' -updated in near-realtime. Our handling of late data lets us generate correct -results, e.g. for 'team prizes'. We're now outputing window results as they're -calculated, giving us much lower latency than with the previous batch examples. - -### GameStats - -The fourth pipeline in the series is `GameStats`. This pipeline builds -on the `LeaderBoard` functionality â supporting output of speculative and late -data â and adds some "business intelligence" analysis: identifying abuse -detection. The pipeline derives the Mean user score sum for a window, and uses -that information to identify likely spammers/robots. (The injector is designed -so that the "robots" have a higher click rate than the "real" users). The robot -users are then filtered out when calculating the team scores. - -Additionally, user sessions are tracked: that is, we find bursts of user -activity using session windows. Then, the mean session duration information is -recorded in the context of subsequent fixed windowing. (This could be used to -tell us what games are giving us greater user retention). - -### Running the PubSub Injector - -The `LeaderBoard` and `GameStats` example pipelines read unbounded data -from a PubSub topic. - -Use the `injector.Injector` program to generate this data and publish to a -PubSub topic. See the `Injector`javadocs for more information on how to run the -injector. Set up the injector before you start one of these pipelines. Then, -when you start the pipeline, pass as an argument the name of that PubSub topic. -See the pipeline javadocs for the details. - -## Viewing the results in BigQuery - -All of the pipelines write their results to BigQuery. `UserScore` and -`HourlyTeamScore` each write one table, and `LeaderBoard` and -`GameStats` each write two. The pipelines have default table names that -you can override when you start up the pipeline if those tables already exist. - -Depending on the windowing intervals defined in a given pipeline, you may have -to wait for a while (more than an hour) before you start to see results written -to the BigQuery tables. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java deleted file mode 100644 index de06ce3..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game; - -import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import org.apache.avro.reflect.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * This class is the first in a series of four pipelines that tell a story in a 'gaming' domain. - * Concepts: batch processing; reading input from Google Cloud Storage and writing output to - * BigQuery; using standalone DoFns; use of the sum by key transform; examples of - * Java 8 lambda syntax. - * - * <p> In this gaming scenario, many users play, as members of different teams, over the course of a - * day, and their actions are logged for processing. Some of the logged game events may be late- - * arriving, if users play on mobile devices and go transiently offline for a period. - * - * <p> This pipeline does batch processing of data collected from gaming events. It calculates the - * sum of scores per user, over an entire batch of gaming data (collected, say, for each day). The - * batch processing will not include any late data that arrives after the day's cutoff point. - * - * <p> To execute this pipeline using the Dataflow service and static example input data, specify - * the pipeline configuration like this: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * --dataset=YOUR-DATASET - * } - * </pre> - * where the BigQuery dataset you specify must already exist. - * - * <p> Optionally include the --input argument to specify a batch input file. - * See the --input default value for example batch data file, or use {@link injector.Injector} to - * generate your own batch data. - */ -public class UserScore { - - /** - * Class to hold info about a game event. - */ - @DefaultCoder(AvroCoder.class) - static class GameActionInfo { - @Nullable String user; - @Nullable String team; - @Nullable Integer score; - @Nullable Long timestamp; - - public GameActionInfo() {} - - public GameActionInfo(String user, String team, Integer score, Long timestamp) { - this.user = user; - this.team = team; - this.score = score; - this.timestamp = timestamp; - } - - public String getUser() { - return this.user; - } - public String getTeam() { - return this.team; - } - public Integer getScore() { - return this.score; - } - public String getKey(String keyname) { - if (keyname.equals("team")) { - return this.team; - } else { // return username as default - return this.user; - } - } - public Long getTimestamp() { - return this.timestamp; - } - } - - - /** - * Parses the raw game event info into GameActionInfo objects. Each event line has the following - * format: username,teamname,score,timestamp_in_ms,readable_time - * e.g.: - * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 - * The human-readable time string is not used here. - */ - static class ParseEventFn extends DoFn<String, GameActionInfo> { - - // Log and count parse errors. - private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class); - private final Aggregator<Long, Long> numParseErrors = - createAggregator("ParseErrors", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - String[] components = c.element().split(","); - try { - String user = components[0].trim(); - String team = components[1].trim(); - Integer score = Integer.parseInt(components[2].trim()); - Long timestamp = Long.parseLong(components[3].trim()); - GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp); - c.output(gInfo); - } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { - numParseErrors.addValue(1L); - LOG.info("Parse error on " + c.element() + ", " + e.getMessage()); - } - } - } - - /** - * A transform to extract key/score information from GameActionInfo, and sum the scores. The - * constructor arg determines whether 'team' or 'user' info is extracted. - */ - // [START DocInclude_USExtractXform] - public static class ExtractAndSumScore - extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> { - - private final String field; - - ExtractAndSumScore(String field) { - this.field = field; - } - - @Override - public PCollection<KV<String, Integer>> apply( - PCollection<GameActionInfo> gameInfo) { - - return gameInfo - .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType(new TypeDescriptor<KV<String, Integer>>() {})) - .apply(Sum.<String>integersPerKey()); - } - } - // [END DocInclude_USExtractXform] - - - /** - * Options supported by {@link UserScore}. - */ - public static interface Options extends PipelineOptions { - - @Description("Path to the data file(s) containing game data.") - // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent - // day's worth (roughly) of data. - @Default.String("gs://dataflow-samples/game/gaming_data*.csv") - String getInput(); - void setInput(String value); - - @Description("BigQuery Dataset to write tables to. Must already exist.") - @Validation.Required - String getDataset(); - void setDataset(String value); - - @Description("The BigQuery table name. Should not already exist.") - @Default.String("user_score") - String getTableName(); - void setTableName(String value); - } - - /** - * Create a map of information that describes how to write pipeline output to BigQuery. This map - * is passed to the {@link WriteToBigQuery} constructor to write user score sums. - */ - protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> - configureBigQueryWrite() { - Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = - new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>(); - tableConfigure.put("user", - new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey())); - tableConfigure.put("total_score", - new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue())); - return tableConfigure; - } - - - /** - * Run a batch pipeline. - */ - // [START DocInclude_USMain] - public static void main(String[] args) throws Exception { - // Begin constructing a pipeline configured by commandline flags. - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline pipeline = Pipeline.create(options); - - // Read events from a text file and parse them. - pipeline.apply(TextIO.Read.from(options.getInput())) - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) - // Extract and sum username/score pairs from the event data. - .apply("ExtractUserScore", new ExtractAndSumScore("user")) - .apply("WriteUserScoreSums", - new WriteToBigQuery<KV<String, Integer>>(options.getTableName(), - configureBigQueryWrite())); - - // Run the batch pipeline. - pipeline.run(); - } - // [END DocInclude_USMain] - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java deleted file mode 100644 index 1691c54..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game.injector; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.common.collect.ImmutableMap; - -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.TimeZone; - - -/** - * This is a generator that simulates usage data from a mobile game, and either publishes the data - * to a pubsub topic or writes it to a file. - * - * <p> The general model used by the generator is the following. There is a set of teams with team - * members. Each member is scoring points for their team. After some period, a team will dissolve - * and a new one will be created in its place. There is also a set of 'Robots', or spammer users. - * They hop from team to team. The robots are set to have a higher 'click rate' (generate more - * events) than the regular team members. - * - * <p> Each generated line of data has the following form: - * username,teamname,score,timestamp_in_ms,readable_time - * e.g.: - * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224 - * - * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if - * specified. It takes the following arguments: - * {@code Injector project-name (topic-name|none) (filename|none)}. - * - * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate - * locally using project-based service account credentials to avoid running over PubSub - * quota. - * See https://developers.google.com/identity/protocols/application-default-credentials - * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS - * environment variable to point to your downloaded service account credentials before starting the - * program, e.g.: - * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}. - * If you do not do this, then your injector will only run for a few minutes on your - * 'user account' credentials before you will start to see quota error messages like: - * "Request throttled due to user QPS limit being reached", and see this exception: - * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests". - * Once you've set up your credentials, run the Injector like this": - * <pre>{@code - * Injector <project-name> <topic-name> none - * } - * </pre> - * The pubsub topic will be created if it does not exist. - * - * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the - * filename: - * <pre>{@code - * Injector <project-name> none <filename> - * } - * </pre> - */ -class Injector { - private static Pubsub pubsub; - private static Random random = new Random(); - private static String topic; - private static String project; - private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms"; - - // QPS ranges from 800 to 1000. - private static final int MIN_QPS = 800; - private static final int QPS_RANGE = 200; - // How long to sleep, in ms, between creation of the threads that make API requests to PubSub. - private static final int THREAD_SLEEP_MS = 500; - - // Lists used to generate random team names. - private static final ArrayList<String> COLORS = - new ArrayList<String>(Arrays.asList( - "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber", - "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen", - "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana", - "Beige", "Bisque", "BarnRed", "BattleshipGrey")); - - private static final ArrayList<String> ANIMALS = - new ArrayList<String>(Arrays.asList( - "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu", - "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus", - "Bandicoot", "Cockatoo", "Antechinus")); - - // The list of live teams. - private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>(); - - private static DateTimeFormatter fmt = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST"))); - - - // The total number of robots in the system. - private static final int NUM_ROBOTS = 20; - // Determines the chance that a team will have a robot team member. - private static final int ROBOT_PROBABILITY = 3; - private static final int NUM_LIVE_TEAMS = 15; - private static final int BASE_MEMBERS_PER_TEAM = 5; - private static final int MEMBERS_PER_TEAM = 15; - private static final int MAX_SCORE = 20; - private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes - private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay - private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000; - - // The minimum time a 'team' can live. - private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20; - private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20; - - - /** - * A class for holding team info: the name of the team, when it started, - * and the current team members. Teams may but need not include one robot team member. - */ - private static class TeamInfo { - String teamName; - long startTimeInMillis; - int expirationPeriod; - // The team might but need not include 1 robot. Will be non-null if so. - String robot; - int numMembers; - - private TeamInfo(String teamName, long startTimeInMillis, String robot) { - this.teamName = teamName; - this.startTimeInMillis = startTimeInMillis; - // How long until this team is dissolved. - this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) + - BASE_TEAM_EXPIRATION_TIME_IN_MINS; - this.robot = robot; - // Determine the number of team members. - numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM; - } - - String getTeamName() { - return teamName; - } - String getRobot() { - return robot; - } - - long getStartTimeInMillis() { - return startTimeInMillis; - } - long getEndTimeInMillis() { - return startTimeInMillis + (expirationPeriod * 60 * 1000); - } - String getRandomUser() { - int userNum = random.nextInt(numMembers); - return "user" + userNum + "_" + teamName; - } - - int numMembers() { - return numMembers; - } - - @Override - public String toString() { - return "(" + teamName + ", num members: " + numMembers() + ", starting at: " - + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")"; - } - } - - /** Utility to grab a random element from an array of Strings. */ - private static String randomElement(ArrayList<String> list) { - int index = random.nextInt(list.size()); - return list.get(index); - } - - /** - * Get and return a random team. If the selected team is too old w.r.t its expiration, remove - * it, replacing it with a new team. - */ - private static TeamInfo randomTeam(ArrayList<TeamInfo> list) { - int index = random.nextInt(list.size()); - TeamInfo team = list.get(index); - // If the selected team is expired, remove it and return a new team. - long currTime = System.currentTimeMillis(); - if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) { - System.out.println("\nteam " + team + " is too old; replacing."); - System.out.println("start time: " + team.getStartTimeInMillis() + - ", end time: " + team.getEndTimeInMillis() + - ", current time:" + currTime); - removeTeam(index); - // Add a new team in its stead. - return (addLiveTeam()); - } else { - return team; - } - } - - /** - * Create and add a team. Possibly add a robot to the team. - */ - private static synchronized TeamInfo addLiveTeam() { - String teamName = randomElement(COLORS) + randomElement(ANIMALS); - String robot = null; - // Decide if we want to add a robot to the team. - if (random.nextInt(ROBOT_PROBABILITY) == 0) { - robot = "Robot-" + random.nextInt(NUM_ROBOTS); - } - // Create the new team. - TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot); - liveTeams.add(newTeam); - System.out.println("[+" + newTeam + "]"); - return newTeam; - } - - /** - * Remove a specific team. - */ - private static synchronized void removeTeam(int teamIndex) { - TeamInfo removedTeam = liveTeams.remove(teamIndex); - System.out.println("[-" + removedTeam + "]"); - } - - /** Generate a user gaming event. */ - private static String generateEvent(Long currTime, int delayInMillis) { - TeamInfo team = randomTeam(liveTeams); - String teamName = team.getTeamName(); - String user; - final int parseErrorRate = 900000; - - String robot = team.getRobot(); - // If the team has an associated robot team member... - if (robot != null) { - // Then use that robot for the message with some probability. - // Set this probability to higher than that used to select any of the 'regular' team - // members, so that if there is a robot on the team, it has a higher click rate. - if (random.nextInt(team.numMembers() / 2) == 0) { - user = robot; - } else { - user = team.getRandomUser(); - } - } else { // No robot. - user = team.getRandomUser(); - } - String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE); - // Randomly introduce occasional parse errors. You can see a custom counter tracking the number - // of such errors in the Dataflow Monitoring UI, as the example pipeline runs. - if (random.nextInt(parseErrorRate) == 0) { - System.out.println("Introducing a parse error."); - event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR"; - } - return addTimeInfoToEvent(event, currTime, delayInMillis); - } - - /** - * Add time info to a generated gaming event. - */ - private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) { - String eventTimeString = - Long.toString((currTime - delayInMillis) / 1000 * 1000); - // Add a (redundant) 'human-readable' date string to make the data semantics more clear. - String dateString = fmt.print(currTime); - message = message + "," + eventTimeString + "," + dateString; - return message; - } - - /** - * Publish 'numMessages' arbitrary events from live users with the provided delay, to a - * PubSub topic. - */ - public static void publishData(int numMessages, int delayInMillis) - throws IOException { - List<PubsubMessage> pubsubMessages = new ArrayList<>(); - - for (int i = 0; i < Math.max(1, numMessages); i++) { - Long currTime = System.currentTimeMillis(); - String message = generateEvent(currTime, delayInMillis); - PubsubMessage pubsubMessage = new PubsubMessage() - .encodeData(message.getBytes("UTF-8")); - pubsubMessage.setAttributes( - ImmutableMap.of(TIMESTAMP_ATTRIBUTE, - Long.toString((currTime - delayInMillis) / 1000 * 1000))); - if (delayInMillis != 0) { - System.out.println(pubsubMessage.getAttributes()); - System.out.println("late data for: " + message); - } - pubsubMessages.add(pubsubMessage); - } - - PublishRequest publishRequest = new PublishRequest(); - publishRequest.setMessages(pubsubMessages); - pubsub.projects().topics().publish(topic, publishRequest).execute(); - } - - /** - * Publish generated events to a file. - */ - public static void publishDataToFile(String fileName, int numMessages, int delayInMillis) - throws IOException { - PrintWriter out = new PrintWriter(new OutputStreamWriter( - new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8")); - - try { - for (int i = 0; i < Math.max(1, numMessages); i++) { - Long currTime = System.currentTimeMillis(); - String message = generateEvent(currTime, delayInMillis); - out.println(message); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if (out != null) { - out.flush(); - out.close(); - } - } - } - - - public static void main(String[] args) throws IOException, InterruptedException { - if (args.length < 3) { - System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)"); - System.exit(1); - } - boolean writeToFile = false; - boolean writeToPubsub = true; - project = args[0]; - String topicName = args[1]; - String fileName = args[2]; - // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if - // specified; otherwise, it will try to write to a file. - if (topicName.equalsIgnoreCase("none")) { - writeToFile = true; - writeToPubsub = false; - } - if (writeToPubsub) { - // Create the PubSub client. - pubsub = InjectorUtils.getClient(); - // Create the PubSub topic as necessary. - topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName); - InjectorUtils.createTopic(pubsub, topic); - System.out.println("Injecting to topic: " + topic); - } else { - if (fileName.equalsIgnoreCase("none")) { - System.out.println("Filename not specified."); - System.exit(1); - } - System.out.println("Writing to file: " + fileName); - } - System.out.println("Starting Injector"); - - // Start off with some random live teams. - while (liveTeams.size() < NUM_LIVE_TEAMS) { - addLiveTeam(); - } - - // Publish messages at a rate determined by the QPS and Thread sleep settings. - for (int i = 0; true; i++) { - if (Thread.activeCount() > 10) { - System.err.println("I'm falling behind!"); - } - - // Decide if this should be a batch of late data. - final int numMessages; - final int delayInMillis; - if (i % LATE_DATA_RATE == 0) { - // Insert delayed data for one user (one message only) - delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS); - numMessages = 1; - System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")"); - } else { - System.out.print("."); - delayInMillis = 0; - numMessages = MIN_QPS + random.nextInt(QPS_RANGE); - } - - if (writeToFile) { // Won't use threading for the file write. - publishDataToFile(fileName, numMessages, delayInMillis); - } else { // Write to PubSub. - // Start a thread to inject some data. - new Thread(){ - @Override - public void run() { - try { - publishData(numMessages, delayInMillis); - } catch (IOException e) { - System.err.println(e); - } - } - }.start(); - } - - // Wait before creating another injector thread. - Thread.sleep(THREAD_SLEEP_MS); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java deleted file mode 100644 index 55982df..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game.injector; - - -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.util.Utils; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.PubsubScopes; -import com.google.api.services.pubsub.model.Topic; - -import com.google.common.base.Preconditions; - -import java.io.IOException; - -class InjectorUtils { - - private static final String APP_NAME = "injector"; - - /** - * Builds a new Pubsub client and returns it. - */ - public static Pubsub getClient(final HttpTransport httpTransport, - final JsonFactory jsonFactory) - throws IOException { - Preconditions.checkNotNull(httpTransport); - Preconditions.checkNotNull(jsonFactory); - GoogleCredential credential = - GoogleCredential.getApplicationDefault(httpTransport, jsonFactory); - if (credential.createScopedRequired()) { - credential = credential.createScoped(PubsubScopes.all()); - } - if (credential.getClientAuthentication() != null) { - System.out.println("\n***Warning! You are not using service account credentials to " - + "authenticate.\nYou need to use service account credentials for this example," - + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run " - + "out of PubSub quota very quickly.\nSee " - + "https://developers.google.com/identity/protocols/application-default-credentials."); - System.exit(1); - } - HttpRequestInitializer initializer = - new RetryHttpInitializerWrapper(credential); - return new Pubsub.Builder(httpTransport, jsonFactory, initializer) - .setApplicationName(APP_NAME) - .build(); - } - - /** - * Builds a new Pubsub client with default HttpTransport and - * JsonFactory and returns it. - */ - public static Pubsub getClient() throws IOException { - return getClient(Utils.getDefaultTransport(), - Utils.getDefaultJsonFactory()); - } - - - /** - * Returns the fully qualified topic name for Pub/Sub. - */ - public static String getFullyQualifiedTopicName( - final String project, final String topic) { - return String.format("projects/%s/topics/%s", project, topic); - } - - /** - * Create a topic if it doesn't exist. - */ - public static void createTopic(Pubsub client, String fullTopicName) - throws IOException { - try { - client.projects().topics().get(fullTopicName).execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { - Topic topic = client.projects().topics() - .create(fullTopicName, new Topic()) - .execute(); - System.out.printf("Topic %s was created.\n", topic.getName()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java deleted file mode 100644 index 1437534..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game.injector; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.http.HttpBackOffIOExceptionHandler; -import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpUnsuccessfulResponseHandler; -import com.google.api.client.util.ExponentialBackOff; -import com.google.api.client.util.Sleeper; -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.util.logging.Logger; - -/** - * RetryHttpInitializerWrapper will automatically retry upon RPC - * failures, preserving the auto-refresh behavior of the Google - * Credentials. - */ -public class RetryHttpInitializerWrapper implements HttpRequestInitializer { - - /** - * A private logger. - */ - private static final Logger LOG = - Logger.getLogger(RetryHttpInitializerWrapper.class.getName()); - - /** - * One minutes in miliseconds. - */ - private static final int ONEMINITUES = 60000; - - /** - * Intercepts the request for filling in the "Authorization" - * header field, as well as recovering from certain unsuccessful - * error codes wherein the Credential must refresh its token for a - * retry. - */ - private final Credential wrappedCredential; - - /** - * A sleeper; you can replace it with a mock in your test. - */ - private final Sleeper sleeper; - - /** - * A constructor. - * - * @param wrappedCredential Credential which will be wrapped and - * used for providing auth header. - */ - public RetryHttpInitializerWrapper(final Credential wrappedCredential) { - this(wrappedCredential, Sleeper.DEFAULT); - } - - /** - * A protected constructor only for testing. - * - * @param wrappedCredential Credential which will be wrapped and - * used for providing auth header. - * @param sleeper Sleeper for easy testing. - */ - RetryHttpInitializerWrapper( - final Credential wrappedCredential, final Sleeper sleeper) { - this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential); - this.sleeper = sleeper; - } - - /** - * Initializes the given request. - */ - @Override - public final void initialize(final HttpRequest request) { - request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout - final HttpUnsuccessfulResponseHandler backoffHandler = - new HttpBackOffUnsuccessfulResponseHandler( - new ExponentialBackOff()) - .setSleeper(sleeper); - request.setInterceptor(wrappedCredential); - request.setUnsuccessfulResponseHandler( - new HttpUnsuccessfulResponseHandler() { - @Override - public boolean handleResponse( - final HttpRequest request, - final HttpResponse response, - final boolean supportsRetry) throws IOException { - if (wrappedCredential.handleResponse( - request, response, supportsRetry)) { - // If credential decides it can handle it, - // the return code or message indicated - // something specific to authentication, - // and no backoff is desired. - return true; - } else if (backoffHandler.handleResponse( - request, response, supportsRetry)) { - // Otherwise, we defer to the judgement of - // our internal backoff handler. - LOG.info("Retrying " - + request.getUrl().toString()); - return true; - } else { - return false; - } - } - }); - request.setIOExceptionHandler( - new HttpBackOffIOExceptionHandler(new ExponentialBackOff()) - .setSleeper(sleeper)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java deleted file mode 100644 index 2cf719a..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java +++ /dev/null @@ -1,134 +0,0 @@ - /* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game.utils; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.complete.game.UserScore; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; -import com.google.cloud.dataflow.sdk.options.GcpOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Generate, format, and write BigQuery table row information. Use provided information about - * the field names and types, as well as lambda functions that describe how to generate their - * values. - */ -public class WriteToBigQuery<T> - extends PTransform<PCollection<T>, PDone> { - - protected String tableName; - protected Map<String, FieldInfo<T>> fieldInfo; - - public WriteToBigQuery() { - } - - public WriteToBigQuery(String tableName, - Map<String, FieldInfo<T>> fieldInfo) { - this.tableName = tableName; - this.fieldInfo = fieldInfo; - } - - /** Define a class to hold information about output table field definitions. */ - public static class FieldInfo<T> implements Serializable { - // The BigQuery 'type' of the field - private String fieldType; - // A lambda function to generate the field value - private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn; - - public FieldInfo(String fieldType, - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) { - this.fieldType = fieldType; - this.fieldFn = fieldFn; - } - - String getFieldType() { - return this.fieldType; - } - - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() { - return this.fieldFn; - } - } - /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */ - protected class BuildRowFn extends DoFn<T, TableRow> { - - @Override - public void processElement(ProcessContext c) { - - TableRow row = new TableRow(); - for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo<T> fcnInfo = entry.getValue(); - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn = - fcnInfo.getFieldFn(); - row.set(key, fcn.apply(c)); - } - c.output(row); - } - } - - /** Build the output table schema. */ - protected TableSchema getSchema() { - List<TableFieldSchema> fields = new ArrayList<>(); - for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo<T> fcnInfo = entry.getValue(); - String bqType = fcnInfo.getFieldType(); - fields.add(new TableFieldSchema().setName(key).setType(bqType)); - } - return new TableSchema().setFields(fields); - } - - @Override - public PDone apply(PCollection<T> teamAndScore) { - return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) - .apply(BigQueryIO.Write - .to(getTable(teamAndScore.getPipeline(), - tableName)) - .withSchema(getSchema()) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); - } - - /** Utility to construct an output table reference. */ - static TableReference getTable(Pipeline pipeline, String tableName) { - PipelineOptions options = pipeline.getOptions(); - TableReference table = new TableReference(); - table.setDatasetId(options.as(UserScore.Options.class).getDataset()); - table.setProjectId(options.as(GcpOptions.class).getProject()); - table.setTableId(tableName); - return table; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java deleted file mode 100644 index 8433021..0000000 --- a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ /dev/null @@ -1,76 +0,0 @@ - /* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game.utils; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -import java.util.Map; - -/** - * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery} - * to require windowing; so this subclass may be used for writes that require access to the - * context's window information. - */ -public class WriteWindowedToBigQuery<T> - extends WriteToBigQuery<T> { - - public WriteWindowedToBigQuery(String tableName, - Map<String, FieldInfo<T>> fieldInfo) { - super(tableName, fieldInfo); - } - - /** Convert each key/score pair into a BigQuery TableRow. */ - protected class BuildRowFn extends DoFn<T, TableRow> - implements RequiresWindowAccess { - - @Override - public void processElement(ProcessContext c) { - - TableRow row = new TableRow(); - for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) { - String key = entry.getKey(); - FieldInfo<T> fcnInfo = entry.getValue(); - SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn = - fcnInfo.getFieldFn(); - row.set(key, fcn.apply(c)); - } - c.output(row); - } - } - - @Override - public PDone apply(PCollection<T> teamAndScore) { - return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) - .apply(BigQueryIO.Write - .to(getTable(teamAndScore.getPipeline(), - tableName)) - .withSchema(getSchema()) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_APPEND)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java deleted file mode 100644 index fcae41c..0000000 --- a/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.GcsOptions; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Filter; -import com.google.cloud.dataflow.sdk.transforms.FlatMapElements; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.util.GcsUtil; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; -import java.util.Arrays; -import java.util.List; - -/** - * To keep {@link MinimalWordCountJava8} simple, it is not factored or testable. This test - * file should be maintained with a copy of its code for a basic smoke test. - */ -@RunWith(JUnit4.class) -public class MinimalWordCountJava8Test implements Serializable { - - /** - * A basic smoke test that ensures there is no crash at pipeline construction time. - */ - @Test - public void testMinimalWordCountJava8() throws Exception { - Pipeline p = TestPipeline.create(); - p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil()); - - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) - .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(new TypeDescriptor<String>() {})) - .apply(Filter.byPredicate((String word) -> !word.isEmpty())) - .apply(Count.<String>perElement()) - .apply(MapElements - .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(new TypeDescriptor<String>() {})) - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); - } - - private GcsUtil buildMockGcsUtil() throws IOException { - GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); - - // Any request to open gets a new bogus channel - Mockito - .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) - .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - // Any request for expansion returns a list containing the original GcsPath - // This is required to pass validation that occurs in TextIO during apply() - Mockito - .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) - .then(new Answer<List<GcsPath>>() { - @Override - public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - - return mockGcsUtil; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java ---------------------------------------------------------------------- diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java deleted file mode 100644 index f77d146..0000000 --- a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game; - -import com.google.cloud.dataflow.examples.complete.game.GameStats.CalculateSpammyUsers; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -/** - * Tests of GameStats. - * Because the pipeline was designed for easy readability and explanations, it lacks good - * modularity for testing. See our testing documentation for better ideas: - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline. - */ -@RunWith(JUnit4.class) -public class GameStatsTest implements Serializable { - - // User scores - static final List<KV<String, Integer>> USER_SCORES = Arrays.asList( - KV.of("Robot-2", 66), KV.of("Robot-1", 116), KV.of("user7_AndroidGreenKookaburra", 23), - KV.of("user7_AndroidGreenKookaburra", 1), - KV.of("user19_BisqueBilby", 14), KV.of("user13_ApricotQuokka", 15), - KV.of("user18_BananaEmu", 25), KV.of("user6_AmberEchidna", 8), - KV.of("user2_AmberQuokka", 6), KV.of("user0_MagentaKangaroo", 4), - KV.of("user0_MagentaKangaroo", 3), KV.of("user2_AmberCockatoo", 13), - KV.of("user7_AlmondWallaby", 15), KV.of("user6_AmberNumbat", 11), - KV.of("user6_AmberQuokka", 4)); - - // The expected list of 'spammers'. - static final List<KV<String, Integer>> SPAMMERS = Arrays.asList( - KV.of("Robot-2", 66), KV.of("Robot-1", 116)); - - /** Test the calculation of 'spammy users'. */ - @Test - @Category(RunnableOnService.class) - public void testCalculateSpammyUsers() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<KV<String, Integer>> input = p.apply(Create.of(USER_SCORES)); - PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers()); - - // Check the set of spammers. - DataflowAssert.that(output).containsInAnyOrder(SPAMMERS); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java deleted file mode 100644 index f77a5d4..0000000 --- a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game; - -import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo; -import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Filter; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -/** - * Tests of HourlyTeamScore. - * Because the pipeline was designed for easy readability and explanations, it lacks good - * modularity for testing. See our testing documentation for better ideas: - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline. - */ -@RunWith(JUnit4.class) -public class HourlyTeamScoreTest implements Serializable { - - static final String[] GAME_EVENTS_ARRAY = new String[] { - "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444", - "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444", - "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444", - "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444", - "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444", - "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444", - "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444", - "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444", - // time gap... - "user0_AndroidGreenEchidna,AndroidGreenEchidna,0,1447965690000,2015-11-19 12:41:31.053", - "user0_MagentaKangaroo,MagentaKangaroo,4,1447965690000,2015-11-19 12:41:31.053", - "user2_AmberCockatoo,AmberCockatoo,13,1447965690000,2015-11-19 12:41:31.053", - "user18_BananaEmu,BananaEmu,7,1447965690000,2015-11-19 12:41:31.053", - "user3_BananaEmu,BananaEmu,17,1447965690000,2015-11-19 12:41:31.053", - "user18_BananaEmu,BananaEmu,1,1447965690000,2015-11-19 12:41:31.053", - "user18_ApricotCaneToad,ApricotCaneToad,14,1447965690000,2015-11-19 12:41:31.053" - }; - - - static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY); - - - // Used to check the filtering. - static final KV[] FILTERED_EVENTS = new KV[] { - KV.of("user0_AndroidGreenEchidna", 0), KV.of("user0_MagentaKangaroo", 4), - KV.of("user2_AmberCockatoo", 13), - KV.of("user18_BananaEmu", 7), KV.of("user3_BananaEmu", 17), - KV.of("user18_BananaEmu", 1), KV.of("user18_ApricotCaneToad", 14) - }; - - - /** Test the filtering. */ - @Test - @Category(RunnableOnService.class) - public void testUserScoresFilter() throws Exception { - Pipeline p = TestPipeline.create(); - - final Instant startMinTimestamp = new Instant(1447965680000L); - - PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); - - PCollection<KV<String, Integer>> output = input - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) - - .apply("FilterStartTime", Filter.byPredicate( - (GameActionInfo gInfo) - -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) - // run a map to access the fields in the result. - .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor<KV<String, Integer>>() {})); - - DataflowAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java ---------------------------------------------------------------------- diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java deleted file mode 100644 index 641e2c3..0000000 --- a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.examples.complete.game; - -import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore; -import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo; -import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -/** - * Tests of UserScore. - */ -@RunWith(JUnit4.class) -public class UserScoreTest implements Serializable { - - static final String[] GAME_EVENTS_ARRAY = new String[] { - "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444", - "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444", - "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444", - "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444", - "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444", - "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444", - "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444", - "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444", - "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444", - "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444" - }; - - static final String[] GAME_EVENTS_ARRAY2 = new String[] { - "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444", - "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444", - "user13_BisqueBilby,BisqueBilby,xxx,1447955630000,2015-11-19 09:53:53.444" - }; - - static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY); - static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2); - - static final List<KV<String, Integer>> USER_SUMS = Arrays.asList( - KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15), - KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15), - KV.of("user7_AndroidGreenKookaburra", 23), - KV.of("user19_BisqueBilby", 14)); - - static final List<KV<String, Integer>> TEAM_SUMS = Arrays.asList( - KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15), - KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15), - KV.of("AndroidGreenKookaburra", 23), - KV.of("BisqueBilby", 14)); - - /** Test the ParseEventFn DoFn. */ - @Test - public void testParseEventFn() { - DoFnTester<String, GameActionInfo> parseEventFn = - DoFnTester.of(new ParseEventFn()); - - List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY); - Assert.assertEquals(results.size(), 8); - Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo"); - Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo"); - Assert.assertEquals(results.get(0).getScore(), new Integer(3)); - } - - /** Tests ExtractAndSumScore("user"). */ - @Test - @Category(RunnableOnService.class) - public void testUserScoreSums() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); - - PCollection<KV<String, Integer>> output = input - .apply(ParDo.of(new ParseEventFn())) - // Extract and sum username/score pairs from the event data. - .apply("ExtractUserScore", new ExtractAndSumScore("user")); - - // Check the user score sums. - DataflowAssert.that(output).containsInAnyOrder(USER_SUMS); - - p.run(); - } - - /** Tests ExtractAndSumScore("team"). */ - @Test - @Category(RunnableOnService.class) - public void testTeamScoreSums() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); - - PCollection<KV<String, Integer>> output = input - .apply(ParDo.of(new ParseEventFn())) - // Extract and sum teamname/score pairs from the event data. - .apply("ExtractTeamScore", new ExtractAndSumScore("team")); - - // Check the team score sums. - DataflowAssert.that(output).containsInAnyOrder(TEAM_SUMS); - - p.run(); - } - - /** Test that bad input data is dropped appropriately. */ - @Test - @Category(RunnableOnService.class) - public void testUserScoresBadInput() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of())); - - PCollection<KV<String, Integer>> extract = input - .apply(ParDo.of(new ParseEventFn())) - .apply( - MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor<KV<String, Integer>>() {})); - - DataflowAssert.that(extract).empty(); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11bb9e0e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2d0a3e1..3803007 100644 --- a/pom.xml +++ b/pom.xml @@ -147,7 +147,7 @@ <jdk>[1.8,)</jdk> </activation> <modules> - <module>java8examples</module> + <module>examples/java8</module> </modules> </profile> <profile>
