This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db585b7b462 replace clock.milliseconds with stopwatch (#30678)
db585b7b462 is described below

commit db585b7b4623a97e43efa9d4fe78a17fcb6b7d66
Author: clmccart <[email protected]>
AuthorDate: Tue Apr 16 07:51:01 2024 -0700

    replace clock.milliseconds with stopwatch (#30678)
    
    * replace clock.milliseconds with stopwatch
    
    * spotlessapply
    
    * correct stopwatch guava import
---
 .../beam/runners/dataflow/worker/ActiveMessageMetadata.java      | 7 ++++---
 .../beam/runners/dataflow/worker/DataflowExecutionContext.java   | 9 +++------
 .../org/apache/beam/runners/dataflow/worker/streaming/Work.java  | 2 +-
 .../runners/dataflow/worker/DataflowExecutionContextTest.java    | 7 +++++--
 .../dataflow/worker/DataflowExecutionStateSamplerTest.java       | 5 +++--
 5 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
index bc6b930a432..5c0d14334b7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java
@@ -18,15 +18,16 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import com.google.auto.value.AutoValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 
 @AutoValue
 public abstract class ActiveMessageMetadata {
 
   public abstract String userStepName();
 
-  public abstract long startTime();
+  public abstract Stopwatch stopwatch();
 
-  static ActiveMessageMetadata create(String userStepName, Long startTime) {
-    return new AutoValue_ActiveMessageMetadata(userStepName, startTime);
+  static ActiveMessageMetadata create(String userStepName, Stopwatch 
stopWatch) {
+    return new AutoValue_ActiveMessageMetadata(userStepName, stopWatch);
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 7c9d2fa1887..71c158b5da2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -50,10 +50,10 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.DateTimeUtils.MillisProvider;
 import org.joda.time.Instant;
 
 /** Execution context for the Dataflow worker. */
@@ -264,8 +264,6 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
     @Nullable
     private ActiveMessageMetadata activeMessageMetadata = null;
 
-    private final MillisProvider clock = System::currentTimeMillis;
-
     @GuardedBy("this")
     private final Map<String, IntSummaryStatistics> processingTimesByStep = 
new HashMap<>();
 
@@ -333,7 +331,7 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
             synchronized (this) {
               this.activeMessageMetadata =
                   ActiveMessageMetadata.create(
-                      newDFState.getStepName().userName(), clock.getMillis());
+                      newDFState.getStepName().userName(), 
Stopwatch.createStarted());
             }
           }
         }
@@ -382,8 +380,7 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       if (this.activeMessageMetadata == null) {
         return;
       }
-      int processingTime =
-          (int) (System.currentTimeMillis() - 
this.activeMessageMetadata.startTime());
+      int processingTime = (int) 
(this.activeMessageMetadata.stopwatch().elapsed().toMillis());
       this.processingTimesByStep.compute(
           this.activeMessageMetadata.userStepName(),
           (k, v) -> {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 99cdaad200e..64b0eaf5cc0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -171,7 +171,7 @@ public class Work implements Runnable {
       stepBuilder.setUserStepName(activeMessage.get().userStepName());
       ActiveElementMetadata.Builder activeElementBuilder = 
ActiveElementMetadata.newBuilder();
       activeElementBuilder.setProcessingTimeMillis(
-          System.currentTimeMillis() - activeMessage.get().startTime());
+          activeMessage.get().stopwatch().elapsed().toMillis());
       stepBuilder.setActiveMessageMetadata(activeElementBuilder);
       builder.addActiveLatencyBreakdown(stepBuilder.build());
       return builder;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
index f4997cb92b2..24bf659511d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
@@ -35,6 +35,7 @@ import 
org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchMo
 import 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -143,7 +144,8 @@ public class DataflowExecutionContextTest {
 
     // After entering a process state, we should have an active message 
tracked.
     ActiveMessageMetadata expectedMetadata =
-        
ActiveMessageMetadata.create(NameContextsForTests.nameContextForTest().userName(),
 1l);
+        ActiveMessageMetadata.create(
+            NameContextsForTests.nameContextForTest().userName(), 
Stopwatch.createStarted());
     assertTrue(tracker.getActiveMessageMetadata().isPresent());
     Assert.assertEquals(
         expectedMetadata.userStepName(), 
tracker.getActiveMessageMetadata().get().userStepName());
@@ -190,7 +192,8 @@ public class DataflowExecutionContextTest {
         new 
HashSet<>(Arrays.asList(NameContextsForTests.nameContextForTest().userName())),
         gotProcessingTimes.keySet());
     ActiveMessageMetadata expectedMetadata =
-        
ActiveMessageMetadata.create(NameContextsForTests.nameContextForTest().userName(),
 1l);
+        ActiveMessageMetadata.create(
+            NameContextsForTests.nameContextForTest().userName(), 
Stopwatch.createStarted());
     assertTrue(tracker.getActiveMessageMetadata().isPresent());
     Assert.assertEquals(
         expectedMetadata.userStepName(), 
tracker.getActiveMessageMetadata().get().userStepName());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
index b772952b74c..920e37d40ec 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Optional;
 import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import org.joda.time.DateTimeUtils.MillisProvider;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,7 +60,7 @@ public class DataflowExecutionStateSamplerTest {
   public void testAddTrackerRemoveTrackerActiveMessageMetadataGetsUpdated() {
     String workId = "work-item-id1";
     ActiveMessageMetadata testMetadata =
-        ActiveMessageMetadata.create(step1act1.getStepName().userName(), 
clock.getMillis());
+        ActiveMessageMetadata.create(step1act1.getStepName().userName(), 
Stopwatch.createStarted());
     DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
     
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
 
@@ -95,7 +96,7 @@ public class DataflowExecutionStateSamplerTest {
     testSummaryStats.accept(5);
     testCompletedProcessingTimes.put("some-step", testSummaryStats);
     ActiveMessageMetadata testMetadata =
-        ActiveMessageMetadata.create(step1act1.getStepName().userName(), 
clock.getMillis());
+        ActiveMessageMetadata.create(step1act1.getStepName().userName(), 
Stopwatch.createStarted());
     DataflowExecutionStateTracker trackerMock = createMockTracker(workId);
     
when(trackerMock.getActiveMessageMetadata()).thenReturn(Optional.of(testMetadata));
     
when(trackerMock.getProcessingTimesByStepCopy()).thenReturn(testCompletedProcessingTimes);

Reply via email to