This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a6f3ddf898c Improvements to GetWorkTimingInfosTracker when there is
clock skew between the worker and service. (#30990)
a6f3ddf898c is described below
commit a6f3ddf898cbdc7d3114d80ab6fff87762f8fb9e
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Apr 16 18:30:09 2024 +0200
Improvements to GetWorkTimingInfosTracker when there is clock skew between
the worker and service. (#30990)
- remove spammy log
- use service provided timestamps when scaling
---
.../client/grpc/GetWorkTimingInfosTracker.java | 25 ++++++-----
.../client/grpc/GrpcWindmillServerTest.java | 51 +++++++++++++++++++---
2 files changed, 59 insertions(+), 17 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
index dc3486d743a..8e70ef03158 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java
@@ -109,7 +109,7 @@ final class GetWorkTimingInfosTracker {
Instant forwardedByDispatcherTiming =
getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
Instant now = Instant.ofEpochMilli(clock.getMillis());
- if (forwardedByDispatcherTiming != null) {
+ if (forwardedByDispatcherTiming != null &&
now.isAfter(forwardedByDispatcherTiming)) {
Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
aggregatedGetWorkStreamLatencies.compute(
State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
@@ -134,22 +134,23 @@ final class GetWorkTimingInfosTracker {
if (workItemCreationLatency != null) {
latencyAttributions.add(workItemCreationLatency);
}
- if
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
- LOG.warn(
- "Work item creation time {} is after the work received time {}, "
- + "one or more GetWorkStream timing infos are missing.",
- workItemCreationEndTime,
- workItemLastChunkReceivedByWorkerTime);
- return latencyAttributions;
- }
- long totalTransmissionDurationElapsedTime =
- new Duration(workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime).getMillis();
long totalSumDurationTimeMills = 0;
for (SumAndMaxDurations duration :
aggregatedGetWorkStreamLatencies.values()) {
totalSumDurationTimeMills += duration.sum.getMillis();
}
final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;
-
+ long totalTransmissionDurationElapsedTime;
+ if
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+ LOG.debug(
+ "Work item creation time {} is after the work received time {}, "
+ + "one or more GetWorkStream timing infos are missing. Using raw
times without scaling.",
+ workItemCreationEndTime,
+ workItemLastChunkReceivedByWorkerTime);
+ totalTransmissionDurationElapsedTime = finalTotalSumDurationTimeMills;
+ } else {
+ totalTransmissionDurationElapsedTime =
+ new Duration(workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime).getMillis();
+ }
aggregatedGetWorkStreamLatencies.forEach(
(state, duration) -> {
long scaledDuration =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 454c616db41..fe0822a6067 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -17,11 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.InputStream;
import java.io.SequenceInputStream;
@@ -1202,6 +1198,51 @@ public class GrpcWindmillServerTest {
latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER).getTotalDurationMillis());
}
+ @Test
+ public void testGetWorkTimingInfosTracker_ClockSkew() throws Exception {
+ int skewMicros = 50 * 1000;
+ GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() ->
50);
+ List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+ for (int i = 0; i <= 3; i++) {
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_CREATION_START)
+ .setTimestampUsec(skewMicros)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_CREATION_END)
+ .setTimestampUsec(10000 + skewMicros)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+ .setTimestampUsec((i + 11) * 1000 + skewMicros)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+ .setTimestampUsec((i + 16) * 1000 + skewMicros)
+ .build());
+ tracker.addTimingInfo(infos);
+ infos.clear();
+ }
+ // durations for each chunk:
+ // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+ // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+ // GET_WORK_IN_TRANSIT_TO_USER_WORKER: not observed due to skew
+ Map<State, LatencyAttribution> latencies = new HashMap<>();
+ List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+ assertEquals(2, attributions.size());
+ for (LatencyAttribution attribution : attributions) {
+ latencies.put(attribution.getState(), attribution);
+ }
+ assertEquals(10L,
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+ assertEquals(
+ 4L,
latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+ assertNull(latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+ }
+
class ResponseErrorInjector<Stream extends StreamObserver> {
private final Stream stream;