This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9de7281 Dataflow Worker: Fix to report millis for couple processing
times metrics.
9de7281 is described below
commit 9de728151c855ecdec17174a5ce27f090b008a50
Author: Raghu Angadi <[email protected]>
AuthorDate: Fri Nov 2 09:51:58 2018 -0700
Dataflow Worker: Fix to report millis for couple processing times metrics.
---
.../beam/runners/dataflow/worker/BatchModeExecutionContext.java | 2 +-
.../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 9979ec0..2f10a13 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -508,7 +508,7 @@ public class BatchModeExecutionContext
CounterCell throttlingMsecs =
container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
if (throttlingMsecs != null) {
- totalThrottleTime +=
TimeUnit.MICROSECONDS.toSeconds(throttlingMsecs.getCumulative());
+ totalThrottleTime +=
TimeUnit.MILLISECONDS.toSeconds(throttlingMsecs.getCumulative());
}
}
return totalThrottleTime;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index c0f4b44..fbbc069 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1311,9 +1311,9 @@ public class StreamingDataflowWorker {
} finally {
// Update total processing time counters. Updating in finally clause
ensures that
// work items causing exceptions are also accounted in time spent.
- long processingTimeMicros =
- TimeUnit.NANOSECONDS.toMicros(System.nanoTime() -
processingStartTimeNanos);
- stageInfo.totalProcessingMsecs.addValue(processingTimeMicros);
+ long processingTimeMsecs =
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
processingStartTimeNanos);
+ stageInfo.totalProcessingMsecs.addValue(processingTimeMsecs);
// Attribute all the processing to timers if the work item contains any
timers.
// Tests show that work items rarely contain both timers and message
bundles. It should
@@ -1321,7 +1321,7 @@ public class StreamingDataflowWorker {
// Another option: Derive time split between messages and timers based
on recent totals.
// either here or in DFE.
if (work.getWorkItem().hasTimers()) {
- stageInfo.timerProcessingMsecs.addValue(processingTimeMicros);
+ stageInfo.timerProcessingMsecs.addValue(processingTimeMsecs);
}
DataflowWorkerLoggingMDC.setWorkId(null);