arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371048827
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -306,22 +294,10 @@ private void processWork(
throw ExceptionUtils.safeWrapThrowableAsException(t2);
}
} finally {
- // Update total processing time counters. Updating in finally clause
ensures that
- // work items causing exceptions are also accounted in time spent.
- long processingTimeMsecs =
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
processingStartTimeNanos);
- stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);
-
- // Attribute all the processing to timers if the work item contains any
timers.
- // Tests show that work items rarely contain both timers and message
bundles. It should
- // be a fairly close approximation.
- // Another option: Derive time split between messages and timers based
on recent totals.
- // either here or in DFE.
- if (work.getWorkItem().hasTimers()) {
- stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
- }
+ recordProcessingTime(stageInfo, worksToCleanup, work,
processingStartTimeNanos);
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -354,27 +330,35 @@ private Windmill.WorkItemCommitRequest
validateCommitRequestSize(
}
private void recordProcessingStats(
- Windmill.WorkItemCommitRequest.Builder outputBuilder,
- Windmill.WorkItem workItem,
- ExecuteWorkResult executeWorkResult) {
- // Compute shuffle and state byte statistics these will be flushed
asynchronously.
- long stateBytesWritten =
- outputBuilder
- .clearOutputMessages()
- .clearPerWorkItemLatencyAttributions()
- .build()
- .getSerializedSize();
-
-
streamingCounters.windmillShuffleBytesRead().addValue(computeShuffleBytesRead(workItem));
-
streamingCounters.windmillStateBytesRead().addValue(executeWorkResult.stateBytesRead());
- streamingCounters.windmillStateBytesWritten().addValue(stateBytesWritten);
+ List<Work> workBatch,
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders,
+ long totalStateBytesRead) {
+ long totalStateBytesWritten = 0;
+ long totalShuffleBytesRead = 0;
+ for (int i = 0; i < workBatch.size(); i++) {
+ Windmill.WorkItem workItem = workBatch.get(i).getWorkItem();
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
outputBuilders.get(i);
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]