fernando-wizeline commented on code in PR #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r1024278681
##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java:
##########
@@ -114,31 +115,14 @@ private static Map<String, FieldInfo<KV<String,
Integer>>> configureCompleteWind
return tableConfigure;
}
- public static void main(String[] args) throws Exception {
+ public static void applyStatefulTeamScore(Pipeline p, Options options)
throws IOException {
- Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- // Enforce that this pipeline is always run in streaming mode.
- options.setStreaming(true);
- ExampleUtils exampleUtils = new ExampleUtils(options);
- Pipeline pipeline = Pipeline.create(options);
+ PubsubIO.Read<String> records = readRecordsFromPubSub(options);
- pipeline
- // Read game events from Pub/Sub using custom timestamps, which are
extracted from the
- // pubsub data elements, and parse the data.
- .apply(
- PubsubIO.readStrings()
- .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
- .fromTopic(options.getTopic()))
- .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
- // Create <team, GameActionInfo> mapping. UpdateTeamScore uses team
name as key.
- .apply(
- "MapTeamAsKey",
- MapElements.into(
- TypeDescriptors.kvs(
- TypeDescriptors.strings(),
TypeDescriptor.of(GameActionInfo.class)))
- .via((GameActionInfo gInfo) -> KV.of(gInfo.team, gInfo)))
- // Outputs a team's score every time it passes a new multiple of the
threshold.
- .apply("UpdateTeamScore", ParDo.of(new
UpdateTeamScoreFn(options.getThresholdScore())))
+ p.apply(records)
+ // Create <team, GameActionInfo> mapping & Outputs a team's score
every time it passes a new
+ // multiple of the threshold
+ .apply(new TeamScore(options))
// Write the results to BigQuery.
.apply(
Review Comment:
Hi Kiley!
I can add it to the transform; the reason I wanted to leave it as a separate
step was to show the three different steps of the pipeline, the E, the T and
the L.
Let me know if it makes more sense to add the BigQuery step to the TeamScore
transform.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]