scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1222595576


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on 
total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
-      // sum duration in each stage across different chunks, then divide the 
total duration (start
-      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
-      // the user worker) proportionally according the sum duration values 
across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages. This should allow us to identify the slow stage 
meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall 
time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, 
workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && 
workItemCreationEnd.isAfter(workItemCreationEndTime)) {

Review Comment:
   do you need a null check for workItemCreationEndTime too?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on 
total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
-      // sum duration in each stage across different chunks, then divide the 
total duration (start
-      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
-      // the user worker) proportionally according the sum duration values 
across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages. This should allow us to identify the slow stage 
meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall 
time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, 
workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && 
workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+        workItemCreationEndTime = workItemCreationEnd;
       }
+
       Instant receivedByDispatcherTiming =
           getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
-      Instant now = Instant.now();
-      if (receivedByDispatcherTiming != null) {
-        getWorkStreamLatencies.compute(
+      if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {

Review Comment:
   // Record the latency of each chunk between send on worker and arrival on 
dispatcher.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on 
total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
-      // sum duration in each stage across different chunks, then divide the 
total duration (start
-      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
-      // the user worker) proportionally according the sum duration values 
across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages. This should allow us to identify the slow stage 
meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall 
time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.

Review Comment:
   as the difference between starting to get work and the first chunk being 
sent.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on 
total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
-      // sum duration in each stage across different chunks, then divide the 
total duration (start
-      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
-      // the user worker) proportionally according the sum duration values 
across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages. This should allow us to identify the slow stage 
meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall 
time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, 
workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && 
workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+        workItemCreationEndTime = workItemCreationEnd;
       }
+
       Instant receivedByDispatcherTiming =
           getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
-      Instant now = Instant.now();
-      if (receivedByDispatcherTiming != null) {
-        getWorkStreamLatencies.compute(
+      if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {
+        aggregatedGetWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_DISPATCHER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(workItemCreationEnd, 
receivedByDispatcherTiming);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      Instant forwardedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
+      Instant now = Instant.ofEpochMilli(clock.getMillis());
+      if (forwardedByDispatcherTiming != null) {

Review Comment:
   // Record the latency of each chunk between send on dispatcher and arrival 
on worker



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void 
addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && 
aggregatedGetWorkStreamLatencies.size() == 0) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
       }
-      if 
(workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
         LOG.warn(
-            String.format(
-                "Work item creation time %s is after the work received time 
%s, "
-                    + "one or more GetWorkStream timing infos are missing.",
-                workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime));
-        return new ArrayList<>();
-      }
-      List<LatencyAttribution> latencyAttributions = new 
ArrayList<>(getWorkStreamLatencies.size());
-      long totalDurationWallTimeMills =
-          new Duration(workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime)
-              .getMillis();
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
       long totalSumDurationTimeMills = 0;
-      for (Duration duration : getWorkStreamLatencies.values()) {
+      for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
         totalSumDurationTimeMills += duration.getMillis();
       }
 
-      for (Map.Entry<State, Duration> duration : 
getWorkStreamLatencies.entrySet()) {
+      for (Map.Entry<State, Duration> duration : 
aggregatedGetWorkStreamLatencies.entrySet()) {

Review Comment:
   nit: prefer forEach instead of entrySet. You can give better names to than 
getKey/getValue and it avoids some object creations which can add up for 
frequently done loops (https://github.com/apache/beam/pull/25930/files)
   
   aggregatedWorkStreamLatencies.forEach(
     (state, duration) -> {
   
   });



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = 
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on 
total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
-      // sum duration in each stage across different chunks, then divide the 
total duration (start
-      // from the first chunk creation in the windmill worker to the end of 
last chunk reception by
-      // the user worker) proportionally according the sum duration values 
across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid 
confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages. This should allow us to identify the slow stage 
meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall 
time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, 
workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && 
workItemCreationEnd.isAfter(workItemCreationEndTime)) {

Review Comment:
   You could look into removing the nullness warning suppression at the top of 
the class to automate this catch if needed.  But that might require more fixes 
than we want to make in this cl



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 
50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, 
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(

Review Comment:
   hmm, doing the math here is
   130/140*40=37
   
   That's making me think we need to do something else than total per state and 
then scale.  In this case it is odd that the 37 is greater than any of the 
individual latencies 31,32,33,34 for that stage.
   
   Other ideas:
   - take max of each type, scale down if needed to not exceed total latency
   - take the average of each type, scale down if needed to not exceed total 
latency



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void 
addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && 
aggregatedGetWorkStreamLatencies.size() == 0) {

Review Comment:
   nit: use isEmpty() instead of size() == 0



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void 
addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && 
aggregatedGetWorkStreamLatencies.size() == 0) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
       }
-      if 
(workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
         LOG.warn(
-            String.format(
-                "Work item creation time %s is after the work received time 
%s, "
-                    + "one or more GetWorkStream timing infos are missing.",
-                workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime));
-        return new ArrayList<>();
-      }
-      List<LatencyAttribution> latencyAttributions = new 
ArrayList<>(getWorkStreamLatencies.size());
-      long totalDurationWallTimeMills =
-          new Duration(workItemCreationStartTime, 
workItemLastChunkReceivedByWorkerTime)
-              .getMillis();
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
       long totalSumDurationTimeMills = 0;
-      for (Duration duration : getWorkStreamLatencies.values()) {
+      for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
         totalSumDurationTimeMills += duration.getMillis();
       }
 
-      for (Map.Entry<State, Duration> duration : 
getWorkStreamLatencies.entrySet()) {
+      for (Map.Entry<State, Duration> duration : 
aggregatedGetWorkStreamLatencies.entrySet()) {
         latencyAttributions.add(
             LatencyAttribution.newBuilder()
                 .setState(duration.getKey())
                 .setTotalDurationMillis(
-                    (duration.getValue().getMillis() / 
totalSumDurationTimeMills)
-                        * totalDurationWallTimeMills)
+                    (long)
+                        (((double) duration.getValue().getMillis()
+                                / (double) totalSumDurationTimeMills)

Review Comment:
   think you can remove one of the double casts, as with a single double double 
arithmetic will apply
   is the cast to (long) not implicit with java?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 
50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, 
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(
+        (long) (elapsedTime * ((double) 10 / (double) sumDurations)),

Review Comment:
   nit: 10.0 instead of (double) 10
   and think you can remove other double cast



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 
50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, 
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(
+        (long) (elapsedTime * ((double) 10 / (double) sumDurations)),
+        
latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+    assertEquals(
+        (long) (elapsedTime * ((double) 130 / (double) sumDurations)),

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to