arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371043745


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -354,27 +330,35 @@ private Windmill.WorkItemCommitRequest 
validateCommitRequestSize(
   }
 
   private void recordProcessingStats(
-      Windmill.WorkItemCommitRequest.Builder outputBuilder,
-      Windmill.WorkItem workItem,
-      ExecuteWorkResult executeWorkResult) {
-    // Compute shuffle and state byte statistics these will be flushed 
asynchronously.
-    long stateBytesWritten =
-        outputBuilder
-            .clearOutputMessages()
-            .clearPerWorkItemLatencyAttributions()
-            .build()
-            .getSerializedSize();
-
-    
streamingCounters.windmillShuffleBytesRead().addValue(computeShuffleBytesRead(workItem));
-    
streamingCounters.windmillStateBytesRead().addValue(executeWorkResult.stateBytesRead());
-    streamingCounters.windmillStateBytesWritten().addValue(stateBytesWritten);
+      List<Work> workBatch,
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders,
+      long totalStateBytesRead) {
+    long totalStateBytesWritten = 0;
+    long totalShuffleBytesRead = 0;
+    for (int i = 0; i < workBatch.size(); i++) {
+      Windmill.WorkItem workItem = workBatch.get(i).getWorkItem();
+      Windmill.WorkItemCommitRequest.Builder outputBuilder = 
outputBuilders.get(i);
+      // Compute shuffle and state byte statistics these will be flushed 
asynchronously.
+      long stateBytesWritten =
+          outputBuilder
+              .clearOutputMessages()
+              .clearPerWorkItemLatencyAttributions()
+              .build()
+              .getSerializedSize();

Review Comment:
   Updated context methods to return fully built Commits instead of the 
builders. Agree that it looks expensive. Want to defer optimizations to 
separate PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to