fernando-wizeline commented on code in PR #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r1024278681


##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java:
##########
@@ -114,31 +115,14 @@ private static Map<String, FieldInfo<KV<String, 
Integer>>> configureCompleteWind
     return tableConfigure;
   }
 
-  public static void main(String[] args) throws Exception {
+  public static void applyStatefulTeamScore(Pipeline p, Options options) 
throws IOException {
 
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    // Enforce that this pipeline is always run in streaming mode.
-    options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
-    Pipeline pipeline = Pipeline.create(options);
+    PubsubIO.Read<String> records = readRecordsFromPubSub(options);
 
-    pipeline
-        // Read game events from Pub/Sub using custom timestamps, which are 
extracted from the
-        // pubsub data elements, and parse the data.
-        .apply(
-            PubsubIO.readStrings()
-                .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                .fromTopic(options.getTopic()))
-        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-        // Create <team, GameActionInfo> mapping. UpdateTeamScore uses team 
name as key.
-        .apply(
-            "MapTeamAsKey",
-            MapElements.into(
-                    TypeDescriptors.kvs(
-                        TypeDescriptors.strings(), 
TypeDescriptor.of(GameActionInfo.class)))
-                .via((GameActionInfo gInfo) -> KV.of(gInfo.team, gInfo)))
-        // Outputs a team's score every time it passes a new multiple of the 
threshold.
-        .apply("UpdateTeamScore", ParDo.of(new 
UpdateTeamScoreFn(options.getThresholdScore())))
+    p.apply(records)
+        // Create <team, GameActionInfo> mapping & Outputs a team's score 
every time it passes a new
+        // multiple of the threshold
+        .apply(new TeamScore(options))
         // Write the results to BigQuery.
         .apply(

Review Comment:
   Hi Kiley!
   I can add it to the transform; the reason I wanted to leave it as a separate 
step was to show the three different steps of the pipeline, the E, the T and 
the L.
   Let me know if it makes more sense to add the BigQuery step to the TeamScore 
transform.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to