scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1221361193


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1048,6 +1056,7 @@ public void run() {
                 this);
           }
         };
+    work.recordGetWorkStreamLatencies(getWorkStreamLatencies);

Review Comment:
   since the implementation just expects this to be called once (ie not merging 
if called multiple times), how about just modifying the Work constructor to 
take getWorkStreamLatencies



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on 
total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
+      // sum duration in each stage across different chunks, then divide the 
total duration (start
+      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
+      // the user worker) proportionally according the sum duration values 
across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, 
now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();

Review Comment:
   Collections.emptyList() for here and below?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {

Review Comment:
   add a unit test of this class



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -957,4 +968,42 @@ public void onCompleted() {
     stream.close();
     assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
   }
+
+  @Test
+  public void testGetWorkTimingInfosTracker() throws Exception {
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+    for (int i = 0; i <= 3; i++) {
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_START)
+              .setTimestampUsec(i)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_END)
+              .setTimestampUsec(i + 1)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+              .setTimestampUsec(i + 2)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+              .setTimestampUsec(i + 3)
+              .build());
+    }
+    tracker.addTimingInfo(infos);
+    Set<State> states = new HashSet<>();
+    List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+    assertEquals(3, attributions.size());
+    for (LatencyAttribution attribution : attributions) {
+      states.add(attribution.getState());
+    }
+    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));

Review Comment:
   can you inject a clock and verify the numbers calculated?
   In particular it would be good to verify the logic about scaling things



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on 
total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
+      // sum duration in each stage across different chunks, then divide the 
total duration (start
+      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
+      // the user worker) proportionally according the sum duration values 
across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();

Review Comment:
   use clock



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on 
total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
+      // sum duration in each stage across different chunks, then divide the 
total duration (start
+      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
+      // the user worker) proportionally according the sum duration values 
across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, 
now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();
+      }
+      if 
(workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            String.format(

Review Comment:
   LOG itself supports lazy formatting with {}



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on 
total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
+      // sum duration in each stage across different chunks, then divide the 
total duration (start
+      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
+      // the user worker) proportionally according the sum duration values 
across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, 
now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();
+      }
+      if 
(workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            String.format(
+                "Work item creation time %s is after the work received time 
%s, "
+                    + "one or more GetWorkStream timing infos are missing.",
+                workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime));
+        return new ArrayList<>();
+      }
+      List<LatencyAttribution> latencyAttributions = new 
ArrayList<>(getWorkStreamLatencies.size());
+      long totalDurationWallTimeMills =
+          new Duration(workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime)

Review Comment:
   I think that we can just take whatever workItemCreationStartTime to 
workItemCreationEndTime is as the work item creation latency.  That just 
happens once and should be exact.  If we do send it per-chunk and not just the 
first chunk I think that could be changed but it should always be the same if 
repeatedly sent for the same work item.
   
   So I think that you should only scale the other latency sums so that their 
latency s workItemCreationEndTime to workItemLastChunkReceivedByWorkerTime.
   
   Then we'll have total latency of workItemCreationStartTime to 
workItemLastChunkReceivedByWorkerTime given by:
   workItemCreationStart to workItemCreationEnd: reported entirely as 
GET_WORK_IN_WINDMILL_WORKER
   workItemCreationEnd to workItemLastChunkReceivedByWorkerTime: reported as 
GET_WORK_IN_TRANSIT_TO_DISPATCHER and GET_WORK_IN_TRANSIT_TO_USER_WORKER, where 
the sums of chunk latencies are scaled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to