This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab69f9 Fix IllegalArgumentException in Interval
new 8d5b7c7 Merge pull request #13993 from baeminbo/patch-1
6ab69f9 is described below
commit 6ab69f9e6cd6f876af325fbc13cc329fb42a068c
Author: Minbo Bae <[email protected]>
AuthorDate: Sun Feb 14 21:36:40 2021 -0800
Fix IllegalArgumentException in Interval
If a time correction happens, `endTime` can be before `startTime, which
cause `IllegalArgumentException` (the end instant must be greater than the
start instant).
Compute the elapsed time by `endTime.getMillis() - startTime.getMills()`
which may be a negative value but I think it's ok because it's only logging
purpose.
---
.../beam/runners/dataflow/worker/DataflowWorkUnitClient.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index 618a451..1590225 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -48,7 +48,6 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.Duration;
-import org.joda.time.Interval;
import org.slf4j.Logger;
/** A Dataflow WorkUnit client that fetches WorkItems from the Dataflow
service. */
@@ -215,14 +214,15 @@ class DataflowWorkUnitClient implements WorkUnitClient {
&& DataflowWorkerLoggingMDC.getStageName() != null) {
DateTime startTime = stageStartTime.get();
if (startTime != null) {
- // This thread should have been tagged with the stage start time
during getWorkItem(),
- Interval elapsed = new Interval(startTime, endTime);
+ // elapsed time can be negative by time correction
+ long elapsed = endTime.getMillis() - startTime.getMillis();
int numErrors = workItemStatus.getErrors() == null ? 0 :
workItemStatus.getErrors().size();
+ // This thread should have been tagged with the stage start time
during getWorkItem(),
logger.info(
"Finished processing stage {} with {} errors in {} seconds ",
DataflowWorkerLoggingMDC.getStageName(),
numErrors,
- (double) elapsed.toDurationMillis() / 1000);
+ (double) elapsed / 1000);
}
}
shortIdCache.shortenIdsIfAvailable(workItemStatus.getCounterUpdates());