This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a6f3ddf898c Improvements to GetWorkTimingInfosTracker when there is 
clock skew between the worker and service. (#30990)
a6f3ddf898c is described below

commit a6f3ddf898cbdc7d3114d80ab6fff87762f8fb9e
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Apr 16 18:30:09 2024 +0200

    Improvements to GetWorkTimingInfosTracker when there is clock skew between 
the worker and service. (#30990)
    
    - remove spammy log
    - use service provided timestamps when scaling
---
 .../client/grpc/GetWorkTimingInfosTracker.java     | 25 ++++++-----
 .../client/grpc/GrpcWindmillServerTest.java        | 51 +++++++++++++++++++---
 2 files changed, 59 insertions(+), 17 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
index dc3486d743a..8e70ef03158 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
@@ -109,7 +109,7 @@ final class GetWorkTimingInfosTracker {
     Instant forwardedByDispatcherTiming =
         getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
     Instant now = Instant.ofEpochMilli(clock.getMillis());
-    if (forwardedByDispatcherTiming != null) {
+    if (forwardedByDispatcherTiming != null && 
now.isAfter(forwardedByDispatcherTiming)) {
       Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
       aggregatedGetWorkStreamLatencies.compute(
           State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
@@ -134,22 +134,23 @@ final class GetWorkTimingInfosTracker {
     if (workItemCreationLatency != null) {
       latencyAttributions.add(workItemCreationLatency);
     }
-    if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
-      LOG.warn(
-          "Work item creation time {} is after the work received time {}, "
-              + "one or more GetWorkStream timing infos are missing.",
-          workItemCreationEndTime,
-          workItemLastChunkReceivedByWorkerTime);
-      return latencyAttributions;
-    }
-    long totalTransmissionDurationElapsedTime =
-        new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
     long totalSumDurationTimeMills = 0;
     for (SumAndMaxDurations duration : 
aggregatedGetWorkStreamLatencies.values()) {
       totalSumDurationTimeMills += duration.sum.getMillis();
     }
     final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;
-
+    long totalTransmissionDurationElapsedTime;
+    if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      LOG.debug(
+          "Work item creation time {} is after the work received time {}, "
+              + "one or more GetWorkStream timing infos are missing. Using raw 
times without scaling.",
+          workItemCreationEndTime,
+          workItemLastChunkReceivedByWorkerTime);
+      totalTransmissionDurationElapsedTime = finalTotalSumDurationTimeMills;
+    } else {
+      totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
+    }
     aggregatedGetWorkStreamLatencies.forEach(
         (state, duration) -> {
           long scaledDuration =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 454c616db41..fe0822a6067 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -17,11 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.InputStream;
 import java.io.SequenceInputStream;
@@ -1202,6 +1198,51 @@ public class GrpcWindmillServerTest {
         
latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER).getTotalDurationMillis());
   }
 
+  @Test
+  public void testGetWorkTimingInfosTracker_ClockSkew() throws Exception {
+    int skewMicros = 50 * 1000;
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 
50);
+    List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+    for (int i = 0; i <= 3; i++) {
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_START)
+              .setTimestampUsec(skewMicros)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_END)
+              .setTimestampUsec(10000 + skewMicros)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+              .setTimestampUsec((i + 11) * 1000 + skewMicros)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+              .setTimestampUsec((i + 16) * 1000 + skewMicros)
+              .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
+    }
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: not observed due to skew
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
+    List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+    assertEquals(2, attributions.size());
+    for (LatencyAttribution attribution : attributions) {
+      latencies.put(attribution.getState(), attribution);
+    }
+    assertEquals(10L, 
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    assertEquals(
+        4L, 
latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+    assertNull(latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+  }
+
   class ResponseErrorInjector<Stream extends StreamObserver> {
 
     private final Stream stream;

Reply via email to