scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1178012079


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : 
chunk.getPerWorkItemTimingInfosList()) {
+          getWorkStreamTimings.compute(
+              info.getEvent(),
+              (event, recordedTime) -> {
+                Instant newTimingForEvent = 
Instant.ofEpochMilli(info.getTimestampUsec() / 1000);
+                if (recordedTime == null) {
+                  return newTimingForEvent;
+                }
+                switch (event) {
+                  case GET_WORK_CREATION_START:
+                    return recordedTime.isBefore(newTimingForEvent)
+                        ? recordedTime
+                        : newTimingForEvent;
+                  case GET_WORK_CREATION_END:
+                  case GET_WORK_RECEIVED_BY_DISPATCHER:
+                  case GET_WORK_FORWARDED_BY_DISPATCHER:
+                    return recordedTime.isAfter(newTimingForEvent)
+                        ? recordedTime
+                        : newTimingForEvent;
+                  default:
+                    LOG.error("Unknown GetWorkStreamTimingInfo type: " + 
event.name());
+                }
+                return recordedTime;
+              });
+          if (Instant.now().isAfter(workItemReceiveTime)) {

Review Comment:
   only call now once here, and could also call it only once for iteration over 
all chunks.
   Nothing locked or async here so don't think we're losing anything by 
optimizing it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : 
chunk.getPerWorkItemTimingInfosList()) {

Review Comment:
   it might be nice to have a builder object that you just pass these timing 
infos to and then build/finalize to get the latencies. Then it would be easier 
to document and test independently too and we could experiment with keeping 
track differently.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : 
chunk.getPerWorkItemTimingInfosList()) {
+          getWorkStreamTimings.compute(
+              info.getEvent(),
+              (event, recordedTime) -> {
+                Instant newTimingForEvent = 
Instant.ofEpochMilli(info.getTimestampUsec() / 1000);

Review Comment:
   add comment on current strategy
   - take min across chunks for each event and use that to divide up transit 
time
   
   As mentioned offline it seems tricky because:
   - we want total time for all chunks for the work item to match real time 
elapsed so it aligns with other times tracked for the work item.
   - chunks may be delayed due to downstream pushback
   
   Another possible way to track would be identifying the max elapsed time 
within each stage and scaling. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to