This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new db585b7b462 replace clock.milliseconds with stopwatch (#30678)
db585b7b462 is described below
commit db585b7b4623a97e43efa9d4fe78a17fcb6b7d66
Author: clmccart <[email protected]>
AuthorDate: Tue Apr 16 07:51:01 2024 -0700
replace clock.milliseconds with stopwatch (#30678)
* replace clock.milliseconds with stopwatch
* spotlessapply
* correct stopwatch guava import
---
.../beam/runners/dataflow/worker/ActiveMessageMetadata.java | 7 ++++---
.../beam/runners/dataflow/worker/DataflowExecutionContext.java | 9 +++------
.../org/apache/beam/runners/dataflow/worker/streaming/Work.java | 2 +-
.../runners/dataflow/worker/DataflowExecutionContextTest.java | 7 +++++--
.../dataflow/worker/DataflowExecutionStateSamplerTest.java | 5 +++--
5 files changed, 16 insertions(+), 14 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
index bc6b930a432..5c0d14334b7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
@@ -18,15 +18,16 @@
package org.apache.beam.runners.dataflow.worker;
import com.google.auto.value.AutoValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
@AutoValue
public abstract class ActiveMessageMetadata {
public abstract String userStepName();
- public abstract long startTime();
+ public abstract Stopwatch stopwatch();
- static ActiveMessageMetadata create(String userStepName, Long startTime) {
- return new AutoValue_ActiveMessageMetadata(userStepName, startTime);
+ static ActiveMessageMetadata create(String userStepName, Stopwatch
stopWatch) {
+ return new AutoValue_ActiveMessageMetadata(userStepName, stopWatch);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 7c9d2fa1887..71c158b5da2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -50,10 +50,10 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Instant;
/** Execution context for the Dataflow worker. */
@@ -264,8 +264,6 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- private final MillisProvider clock = System::currentTimeMillis;
-
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
@@ -333,7 +331,7 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
- newDFState.getStepName().userName(), clock.getMillis());
+ newDFState.getStepName().userName(),
Stopwatch.createStarted());
}
}
}
@@ -382,8 +380,7 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
if (this.activeMessageMetadata == null) {
return;
}
- int processingTime =
- (int) (System.currentTimeMillis() -
this.activeMessageMetadata.startTime());
+ int processingTime = (int)
(this.activeMessageMetadata.stopwatch().elapsed().toMillis());
this.processingTimesByStep.compute(
this.activeMessageMetadata.userStepName(),
(k, v) -> {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 99cdaad200e..64b0eaf5cc0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -171,7 +171,7 @@ public class Work implements Runnable {
stepBuilder.setUserStepName(activeMessage.get().userStepName());
ActiveElementMetadata.Builder activeElementBuilder =
ActiveElementMetadata.newBuilder();
activeElementBuilder.setProcessingTimeMillis(
- System.currentTimeMillis() - activeMessage.get().startTime());
+ activeMessage.get().stopwatch().elapsed().toMillis());
stepBuilder.setActiveMessageMetadata(activeElementBuilder);
builder.addActiveLatencyBreakdown(stepBuilder.build());
return builder;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
index f4997cb92b2..24bf659511d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
@@ -35,6 +35,7 @@ import
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchMo
import
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -143,7 +144,8 @@ public class DataflowExecutionContextTest {
// After entering a process state, we should have an active message
tracked.
ActiveMessageMetadata expectedMetadata =
-
ActiveMessageMetadata.create(NameContextsForTests.nameContextForTest().userName(),
1l);
+ ActiveMessageMetadata.create(
+ NameContextsForTests.nameContextForTest().userName(),
Stopwatch.createStarted());
assertTrue(tracker.getActiveMessageMetadata().isPresent());
Assert.assertEquals(
expectedMetadata.userStepName(),
tracker.getActiveMessageMetadata().get().userStepName());
@@ -190,7 +192,8 @@ public class DataflowExecutionContextTest {
new
HashSet<>(Arrays.asList(NameContextsForTests.nameContextForTest().userName())),
gotProcessingTimes.keySet());
ActiveMessageMetadata expectedMetadata =
-
ActiveMessageMetadata.create(NameContextsForTests.nameContextForTest().userName(),
1l);
+ ActiveMessageMetadata.create(
+ NameContextsForTests.nameContextForTest().userName(),
Stopwatch.createStarted());
assertTrue(tracker.getActiveMessageMetadata().isPresent());
Assert.assertEquals(
expectedMetadata.userStepName(),
tracker.getActiveMessageMetadata().get().userStepName());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
index b772952b74c..920e37d40ec 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Optional;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.junit.Assert;
import org.junit.Before;
@@ -59,7 +60,7 @@ public class DataflowExecutionStateSamplerTest {
public void testAddTrackerRemoveTrackerActiveMessageMetadataGetsUpdated() {
String workId = "work-item-id1";
ActiveMessageMetadata testMetadata =
- ActiveMessageMetadata.create(step1act1.getStepName().userName(),
clock.getMillis());
+ ActiveMessageMetadata.create(step1act1.getStepName().userName(),
Stopwatch.createStarted());
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
@@ -95,7 +96,7 @@ public class DataflowExecutionStateSamplerTest {
testSummaryStats.accept(5);
testCompletedProcessingTimes.put("some-step", testSummaryStats);
ActiveMessageMetadata testMetadata =
- ActiveMessageMetadata.create(step1act1.getStepName().userName(),
clock.getMillis());
+ ActiveMessageMetadata.create(step1act1.getStepName().userName(),
Stopwatch.createStarted());
DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);