arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371048827


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -306,22 +294,10 @@ private void processWork(
         throw ExceptionUtils.safeWrapThrowableAsException(t2);
       }
     } finally {
-      // Update total processing time counters. Updating in finally clause 
ensures that
-      // work items causing exceptions are also accounted in time spent.
-      long processingTimeMsecs =
-          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
processingStartTimeNanos);
-      stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);
-
-      // Attribute all the processing to timers if the work item contains any 
timers.
-      // Tests show that work items rarely contain both timers and message 
bundles. It should
-      // be a fairly close approximation.
-      // Another option: Derive time split between messages and timers based 
on recent totals.
-      // either here or in DFE.
-      if (work.getWorkItem().hasTimers()) {
-        stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
-      }
+      recordProcessingTime(stageInfo, worksToCleanup, work, 
processingStartTimeNanos);

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -354,27 +330,35 @@ private Windmill.WorkItemCommitRequest 
validateCommitRequestSize(
   }
 
   private void recordProcessingStats(
-      Windmill.WorkItemCommitRequest.Builder outputBuilder,
-      Windmill.WorkItem workItem,
-      ExecuteWorkResult executeWorkResult) {
-    // Compute shuffle and state byte statistics these will be flushed 
asynchronously.
-    long stateBytesWritten =
-        outputBuilder
-            .clearOutputMessages()
-            .clearPerWorkItemLatencyAttributions()
-            .build()
-            .getSerializedSize();
-
-    
streamingCounters.windmillShuffleBytesRead().addValue(computeShuffleBytesRead(workItem));
-    
streamingCounters.windmillStateBytesRead().addValue(executeWorkResult.stateBytesRead());
-    streamingCounters.windmillStateBytesWritten().addValue(stateBytesWritten);
+      List<Work> workBatch,
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders,
+      long totalStateBytesRead) {
+    long totalStateBytesWritten = 0;
+    long totalShuffleBytesRead = 0;
+    for (int i = 0; i < workBatch.size(); i++) {
+      Windmill.WorkItem workItem = workBatch.get(i).getWorkItem();
+      Windmill.WorkItemCommitRequest.Builder outputBuilder = 
outputBuilders.get(i);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to