This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 30762dfc58a Populate getWorkStream latencies in dataflow streaming 
worker harness (#26085)
30762dfc58a is described below

commit 30762dfc58a5d30d254787d975ff6e51f35eb570
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Jun 30 08:51:16 2023 -0700

    Populate getWorkStream latencies in dataflow streaming worker harness 
(#26085)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  55 +++++--
 .../worker/windmill/GrpcWindmillServer.java        | 169 ++++++++++++++++++++-
 .../worker/windmill/WindmillServerStub.java        |   7 +-
 .../dataflow/worker/FakeWindmillServer.java        |  13 +-
 .../worker/StreamingDataflowWorkerTest.java        | 121 +++++++++++----
 .../worker/windmill/GrpcWindmillServerTest.java    |  63 +++++++-
 .../worker/windmill/src/main/proto/windmill.proto  |  42 ++++-
 7 files changed, 425 insertions(+), 45 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 92d0710bbe7..ea5065260a9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -103,6 +103,7 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.CommitWorkStream;
@@ -987,7 +988,11 @@ public class StreamingDataflowWorker {
                 computationWork.getDependentRealtimeInputWatermark());
         for (final Windmill.WorkItem workItem : computationWork.getWorkList()) 
{
           scheduleWorkItem(
-              computationState, inputDataWatermark, 
synchronizedProcessingTime, workItem);
+              computationState,
+              inputDataWatermark,
+              synchronizedProcessingTime,
+              workItem,
+              /*getWorkStreamLatencies=*/ Collections.emptyList());
         }
       }
     }
@@ -1005,13 +1010,15 @@ public class StreamingDataflowWorker {
               (String computation,
                   Instant inputDataWatermark,
                   Instant synchronizedProcessingTime,
-                  Windmill.WorkItem workItem) -> {
+                  Windmill.WorkItem workItem,
+                  Collection<LatencyAttribution> getWorkStreamLatencies) -> {
                 memoryMonitor.waitForResources("GetWork");
                 scheduleWorkItem(
                     getComputationState(computation),
                     inputDataWatermark,
                     synchronizedProcessingTime,
-                    workItem);
+                    workItem,
+                    getWorkStreamLatencies);
               });
       try {
         // Reconnect every now and again to enable better load balancing.
@@ -1030,7 +1037,8 @@ public class StreamingDataflowWorker {
       final ComputationState computationState,
       final Instant inputDataWatermark,
       final Instant synchronizedProcessingTime,
-      final Windmill.WorkItem workItem) {
+      final Windmill.WorkItem workItem,
+      final Collection<LatencyAttribution> getWorkStreamLatencies) {
     Preconditions.checkNotNull(inputDataWatermark);
     // May be null if output watermark not yet known.
     final @Nullable Instant outputDataWatermark =
@@ -1038,7 +1046,7 @@ public class StreamingDataflowWorker {
     Preconditions.checkState(
         outputDataWatermark == null || 
!outputDataWatermark.isAfter(inputDataWatermark));
     Work work =
-        new Work(workItem, clock) {
+        new Work(workItem, clock, getWorkStreamLatencies) {
           @Override
           public void run() {
             process(
@@ -1081,7 +1089,12 @@ public class StreamingDataflowWorker {
       PROCESSING(Windmill.LatencyAttribution.State.ACTIVE),
       READING(Windmill.LatencyAttribution.State.READING),
       COMMIT_QUEUED(Windmill.LatencyAttribution.State.COMMITTING),
-      COMMITTING(Windmill.LatencyAttribution.State.COMMITTING);
+      COMMITTING(Windmill.LatencyAttribution.State.COMMITTING),
+      
GET_WORK_IN_WINDMILL_WORKER(Windmill.LatencyAttribution.State.GET_WORK_IN_WINDMILL_WORKER),
+      GET_WORK_IN_TRANSIT_TO_DISPATCHER(
+          Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_DISPATCHER),
+      GET_WORK_IN_TRANSIT_TO_USER_WORKER(
+          
Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_USER_WORKER);
 
       private final Windmill.LatencyAttribution.State latencyAttributionState;
 
@@ -1099,14 +1112,18 @@ public class StreamingDataflowWorker {
     private final Instant startTime;
     private Instant stateStartTime;
     private State state;
-    private Map<Windmill.LatencyAttribution.State, Duration> 
totalDurationPerState;
+    private final Map<Windmill.LatencyAttribution.State, Duration> 
totalDurationPerState =
+        new EnumMap<>(Windmill.LatencyAttribution.State.class);
 
-    public Work(Windmill.WorkItem workItem, Supplier<Instant> clock) {
+    public Work(
+        Windmill.WorkItem workItem,
+        Supplier<Instant> clock,
+        Collection<LatencyAttribution> getWorkStreamLatencies) {
       this.workItem = workItem;
       this.clock = clock;
       this.startTime = this.stateStartTime = clock.get();
       this.state = State.QUEUED;
-      this.totalDurationPerState = new 
EnumMap<>(Windmill.LatencyAttribution.State.class);
+      recordGetWorkStreamLatencies(getWorkStreamLatencies);
     }
 
     public Windmill.WorkItem getWorkItem() {
@@ -1134,7 +1151,15 @@ public class StreamingDataflowWorker {
       return stateStartTime;
     }
 
-    public Iterable<Windmill.LatencyAttribution> getLatencyAttributionList() {
+    private void recordGetWorkStreamLatencies(
+        Collection<LatencyAttribution> getWorkStreamLatencies) {
+      for (LatencyAttribution latency : getWorkStreamLatencies) {
+        totalDurationPerState.put(
+            latency.getState(), 
Duration.millis(latency.getTotalDurationMillis()));
+      }
+    }
+
+    public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
       List<Windmill.LatencyAttribution> list = new ArrayList<>();
       for (Windmill.LatencyAttribution.State state : 
Windmill.LatencyAttribution.State.values()) {
         Duration duration = totalDurationPerState.getOrDefault(state, 
Duration.ZERO);
@@ -1431,6 +1456,7 @@ public class StreamingDataflowWorker {
 
       // Add the output to the commit queue.
       work.setState(State.COMMIT_QUEUED);
+      
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions());
 
       WorkItemCommitRequest commitRequest = outputBuilder.build();
       int byteLimit = maxWorkItemCommitBytes;
@@ -1454,7 +1480,12 @@ public class StreamingDataflowWorker {
       commitQueue.put(new Commit(commitRequest, computationState, work));
 
       // Compute shuffle and state byte statistics these will be flushed 
asynchronously.
-      long stateBytesWritten = 
outputBuilder.clearOutputMessages().build().getSerializedSize();
+      long stateBytesWritten =
+          outputBuilder
+              .clearOutputMessages()
+              .clearPerWorkItemLatencyAttributions()
+              .build()
+              .getSerializedSize();
       long shuffleBytesRead = 0;
       for (Windmill.InputMessageBundle bundle : 
workItem.getMessageBundlesList()) {
         for (Windmill.Message message : bundle.getMessagesList()) {
@@ -2291,7 +2322,7 @@ public class StreamingDataflowWorker {
                       .setKey(shardedKey.key())
                       .setShardingKey(shardedKey.shardingKey())
                       .setWorkToken(work.getWorkItem().getWorkToken())
-                      
.addAllLatencyAttribution(work.getLatencyAttributionList())
+                      .addAllLatencyAttribution(work.getLatencyAttributions())
                       .build());
             }
           }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 7fe1a7b5440..9dcae93c8d1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -24,8 +24,10 @@ import java.io.PrintWriter;
 import java.io.SequenceInputStream;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.EnumMap;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,11 +66,15 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
@@ -105,6 +111,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -133,6 +140,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
   private static final int GET_DATA_STREAM_CHUNK_SIZE = 2 << 20;
 
   private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;
+
   private static final AtomicLong nextId = new AtomicLong(0);
 
   private final StreamingDataflowWorkerOptions options;
@@ -862,6 +870,155 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private static class SumAndMaxDurations {
+      private Duration sum;
+      private Duration max;
+
+      public SumAndMaxDurations(Duration sum, Duration max) {
+        this.sum = sum;
+        this.max = max;
+      }
+    }
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, SumAndMaxDurations> 
aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on 
total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy 
is to record the
+      // sum duration in each transmission stage across different chunks, then 
divide the total
+      // duration (start from the chunk creation end in the windmill worker to 
the end of last chunk
+      // reception by the user worker) proportionally according the sum 
duration values across the
+      // many stages, the final latency is also capped by the corresponding 
stage maximum latency
+      // seen across multiple chunks. This should allow us to identify the 
slow stage meanwhile
+      // avoid confusions for comparing the stage duration to the total 
processing elapsed wall
+      // time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 
1000));
+      }
+
+      // Record the difference between starting to get work and the first 
chunk being sent as the
+      // work creation time.
+      Instant workItemCreationStart = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = 
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, 
workItemCreationEnd).getMillis())
+                .build();
+      }
+      // Record the work item creation end time as the start of transmission 
stages.
+      if (workItemCreationEnd != null && 
workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+        workItemCreationEndTime = workItemCreationEnd;
+      }
+
+      // Record the latency of each chunk between send on worker and arrival 
on dispatcher.
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {
+        Duration newDuration = new Duration(workItemCreationEnd, 
receivedByDispatcherTiming);
+        aggregatedGetWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_DISPATCHER,
+            (stateKey, duration) -> {
+              if (duration == null) {
+                return new SumAndMaxDurations(newDuration, newDuration);
+              }
+              duration.max = newDuration.isLongerThan(duration.max) ? 
newDuration : duration.max;
+              duration.sum = duration.sum.plus(newDuration);
+              return duration;
+            });
+      }
+
+      // Record the latency of each chunk between send on dispatcher and 
arrival on worker.
+      Instant forwardedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
+      Instant now = Instant.ofEpochMilli(clock.getMillis());
+      if (forwardedByDispatcherTiming != null) {
+        Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
+        aggregatedGetWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (stateKey, duration) -> {
+              if (duration == null) {
+                return new SumAndMaxDurations(newDuration, newDuration);
+              }
+              duration.max = newDuration.isLongerThan(duration.max) ? 
newDuration : duration.max;
+              duration.sum = duration.sum.plus(newDuration);
+              return duration;
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (workItemCreationLatency == null && 
aggregatedGetWorkStreamLatencies.isEmpty()) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
+      }
+      if 
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, 
workItemLastChunkReceivedByWorkerTime).getMillis();
+      long totalSumDurationTimeMills = 0;
+      for (SumAndMaxDurations duration : 
aggregatedGetWorkStreamLatencies.values()) {
+        totalSumDurationTimeMills += duration.sum.getMillis();
+      }
+      final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;
+
+      aggregatedGetWorkStreamLatencies.forEach(
+          (state, duration) -> {
+            long scaledDuration =
+                (long)
+                    (((double) duration.sum.getMillis() / 
finalTotalSumDurationTimeMills)
+                        * totalTransmissionDurationElapsedTime);
+            // Cap final duration by the max state duration across different 
chunks. This ensures
+            // the sum of final durations does not exceed the total elapsed 
time and the duration
+            // for each stage does not exceed the stage maximum.
+            long durationMills = Math.min(duration.max.getMillis(), 
scaledDuration);
+            latencyAttributions.add(
+                LatencyAttribution.newBuilder()
+                    .setState(state)
+                    .setTotalDurationMillis(durationMills)
+                    .build());
+          });
+      return latencyAttributions;
+    }
+
+    public void reset() {
+      this.aggregatedGetWorkStreamLatencies.clear();
+      this.workItemCreationEndTime = Instant.EPOCH;
+      this.workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+      this.workItemCreationLatency = null;
+    }
+  }
+
   private class GrpcGetWorkStream
       extends AbstractWindmillStream<StreamingGetWorkRequest, 
StreamingGetWorkResponseChunk>
       implements GetWorkStream {
@@ -965,12 +1122,16 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
     }
 
     private class WorkItemBuffer {
+
       private String computation;
       private Instant inputDataWatermark;
       private Instant synchronizedProcessingTime;
       private ByteString data = ByteString.EMPTY;
       private long bufferedSize = 0;
 
+      private GetWorkTimingInfosTracker workTimingInfosTracker =
+          new GetWorkTimingInfosTracker(System::currentTimeMillis);
+
       private void setMetadata(Windmill.ComputationWorkItemMetadata metadata) {
         this.computation = metadata.getComputationId();
         this.inputDataWatermark =
@@ -987,6 +1148,7 @@ public class GrpcWindmillServer extends WindmillServerStub 
{
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        
workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList());
       }
 
       public long bufferedSize() {
@@ -995,14 +1157,19 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
 
       public void runAndReset() {
         try {
+          Windmill.WorkItem workItem = 
Windmill.WorkItem.parseFrom(data.newInput());
+          List<LatencyAttribution> getWorkStreamLatencies =
+              workTimingInfosTracker.getLatencyAttributions();
           receiver.receiveWork(
               computation,
               inputDataWatermark,
               synchronizedProcessingTime,
-              Windmill.WorkItem.parseFrom(data.newInput()));
+              workItem,
+              getWorkStreamLatencies);
         } catch (IOException e) {
           LOG.error("Failed to parse work item from stream: ", e);
         }
+        workTimingInfosTracker.reset();
         data = ByteString.EMPTY;
         bufferedSize = 0;
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index 238f22aa643..be6c365ee33 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker.windmill;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import 
org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -72,11 +74,13 @@ public abstract class WindmillServerStub implements 
StatusDataProvider {
   /** Functional interface for receiving WorkItems. */
   @FunctionalInterface
   public interface WorkItemReceiver {
+
     void receiveWork(
         String computation,
         @Nullable Instant inputDataWatermark,
         Instant synchronizedProcessingTime,
-        Windmill.WorkItem workItem);
+        Windmill.WorkItem workItem,
+        Collection<LatencyAttribution> getWorkStreamLatencies);
   }
 
   /**
@@ -133,6 +137,7 @@ public abstract class WindmillServerStub implements 
StatusDataProvider {
   /** Interface for streaming CommitWorkRequests to Windmill. */
   @ThreadSafe
   public interface CommitWorkStream extends WindmillStream {
+
     /**
      * Commits a work item and running onDone when the commit has been 
processed by the server.
      * Returns true if the request was accepted. If false is returned the 
stream should be flushed
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index b0a64c82d03..edf8ce4628c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -48,6 +49,8 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetD
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
@@ -273,7 +276,15 @@ class FakeWindmillServer extends WindmillServerStub {
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, 
Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  Collections.singletonList(
+                      LatencyAttribution.newBuilder()
+                          .setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
+                          .setTotalDurationMillis(1000)
+                          .build()));
             }
           }
         }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index d52335698e5..0e53210c018 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -110,6 +110,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataReq
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedMessageBundle;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold;
@@ -600,6 +601,11 @@ public class StreamingDataflowWorkerTest {
         parseCommitRequest(expectedCommitRequestBuilder.toString()));
   }
 
+  private WorkItemCommitRequest removeDynamicFields(WorkItemCommitRequest 
request) {
+    // Throw away per_work_item_attribution because it is dynamic in tests.
+    return request.toBuilder().clearPerWorkItemLatencyAttributions().build();
+  }
+
   private WorkItemCommitRequest.Builder makeExpectedTruncationRequestOutput(
       int index, String key, long shardingKey, long estimatedSize) throws 
Exception {
     StringBuilder expectedCommitRequestBuilder =
@@ -759,7 +765,8 @@ public class StreamingDataflowWorkerTest {
     for (int i = 0; i < numIters; ++i) {
       assertTrue(result.containsKey((long) i));
       assertEquals(
-          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(), 
result.get((long) i));
+          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+          removeDynamicFields(result.get((long) i)));
     }
 
     verify(hotKeyLogger, 
atLeastOnce()).logHotKeyDetection(nullable(String.class), any());
@@ -798,7 +805,8 @@ public class StreamingDataflowWorkerTest {
     for (int i = 0; i < numIters; ++i) {
       assertTrue(result.containsKey((long) i));
       assertEquals(
-          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(), 
result.get((long) i));
+          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+          removeDynamicFields(result.get((long) i)));
     }
 
     verify(hotKeyLogger, 
atLeastOnce()).logHotKeyDetection(nullable(String.class), any());
@@ -979,7 +987,8 @@ public class StreamingDataflowWorkerTest {
     for (int i = 0; i < numIters; ++i) {
       assertTrue(result.containsKey((long) i));
       assertEquals(
-          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(), 
result.get((long) i));
+          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+          removeDynamicFields(result.get((long) i)));
       assertTrue(result.containsKey((long) i + 1000));
       assertEquals(
           makeExpectedOutput(
@@ -989,7 +998,7 @@ public class StreamingDataflowWorkerTest {
                   DEFAULT_SHARDING_KEY + 1,
                   keyStringForIndex(i))
               .build(),
-          result.get((long) i + 1000));
+          removeDynamicFields(result.get((long) i + 1000)));
       assertTrue(result.containsKey((long) i + numIters));
       assertEquals(
           makeExpectedOutput(
@@ -999,7 +1008,7 @@ public class StreamingDataflowWorkerTest {
                   DEFAULT_SHARDING_KEY,
                   keyStringForIndex(i))
               .build(),
-          result.get((long) i + numIters));
+          removeDynamicFields(result.get((long) i + numIters)));
     }
 
     // Re-add the work, it should process due to the keys no longer being 
active.
@@ -1025,7 +1034,7 @@ public class StreamingDataflowWorkerTest {
                   DEFAULT_SHARDING_KEY,
                   keyStringForIndex(i))
               .build(),
-          result.get((long) i + numIters * 2));
+          removeDynamicFields(result.get((long) i + numIters * 2)));
     }
   }
 
@@ -1117,7 +1126,7 @@ public class StreamingDataflowWorkerTest {
     assertEquals(
         makeExpectedOutput(1, 0, DEFAULT_KEY_STRING, DEFAULT_SHARDING_KEY, 
DEFAULT_KEY_STRING)
             .build(),
-        result.get(1L));
+        removeDynamicFields(result.get(1L)));
     assertEquals(1, result.size());
   }
 
@@ -1165,7 +1174,8 @@ public class StreamingDataflowWorkerTest {
 
     assertEquals(2, result.size());
     assertEquals(
-        makeExpectedOutput(2, 0, "key", DEFAULT_SHARDING_KEY, "key").build(), 
result.get(2L));
+        makeExpectedOutput(2, 0, "key", DEFAULT_SHARDING_KEY, "key").build(),
+        removeDynamicFields(result.get(2L)));
 
     assertTrue(result.containsKey(1L));
     WorkItemCommitRequest largeCommit = result.get(1L);
@@ -1254,7 +1264,7 @@ public class StreamingDataflowWorkerTest {
                   DEFAULT_SHARDING_KEY,
                   keyStringForIndex(i) + "_data" + i)
               .build(),
-          result.get((long) i));
+          removeDynamicFields(result.get((long) i)));
       assertTrue(result.containsKey((long) i + 1000));
       assertEquals(
           makeExpectedOutput(
@@ -1264,7 +1274,7 @@ public class StreamingDataflowWorkerTest {
                   DEFAULT_SHARDING_KEY + i,
                   keyStringForIndex(i) + "_data" + (i + 1000))
               .build(),
-          result.get((long) i + 1000));
+          removeDynamicFields(result.get((long) i + 1000)));
     }
   }
 
@@ -1444,7 +1454,7 @@ public class StreamingDataflowWorkerTest {
     Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(2);
 
     assertThat(
-        result.get((long) timestamp1),
+        removeDynamicFields(result.get((long) timestamp1)),
         equalTo(
             setMessagesMetadata(
                     PaneInfo.NO_FIRING,
@@ -1453,7 +1463,7 @@ public class StreamingDataflowWorkerTest {
                 .build()));
 
     assertThat(
-        result.get((long) timestamp2),
+        removeDynamicFields(result.get((long) timestamp2)),
         equalTo(
             setMessagesMetadata(
                     PaneInfo.NO_FIRING,
@@ -1654,6 +1664,7 @@ public class StreamingDataflowWorkerTest {
         Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
             .clearCounterUpdates()
             .clearOutputMessages()
+            .clearPerWorkItemLatencyAttributions()
             .build()
             .getSerializedSize(),
         splitIntToLong(getCounter(counters, 
"WindmillStateBytesWritten").getInteger()));
@@ -1776,7 +1787,7 @@ public class StreamingDataflowWorkerTest {
         splitIntToLong(getCounter(counters, 
"WindmillStateBytesRead").getInteger()));
     // State updates to clear state
     assertEquals(
-        Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+        
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
             .clearCounterUpdates()
             .clearOutputMessages()
             .build()
@@ -1949,7 +1960,7 @@ public class StreamingDataflowWorkerTest {
     assertEquals(0L, splitIntToLong(getCounter(counters, 
"WindmillStateBytesRead").getInteger()));
     // Timer + buffer + watermark hold
     assertEquals(
-        Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+        
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
             .clearCounterUpdates()
             .clearOutputMessages()
             .build()
@@ -2075,7 +2086,7 @@ public class StreamingDataflowWorkerTest {
         splitIntToLong(getCounter(counters, 
"WindmillStateBytesRead").getInteger()));
     // State updates to clear state
     assertEquals(
-        Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+        
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
             .clearCounterUpdates()
             .clearOutputMessages()
             .build()
@@ -2363,7 +2374,7 @@ public class StreamingDataflowWorkerTest {
         
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
 
     assertThat(
-        commit,
+        removeDynamicFields(commit),
         equalTo(
             setMessagesMetadata(
                     PaneInfo.NO_FIRING,
@@ -2425,7 +2436,7 @@ public class StreamingDataflowWorkerTest {
     finalizeId = 
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
 
     assertThat(
-        commit,
+        removeDynamicFields(commit),
         equalTo(
             parseCommitRequest(
                     "key: \"0000000000000001\" "
@@ -2473,7 +2484,7 @@ public class StreamingDataflowWorkerTest {
     finalizeId = 
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
 
     assertThat(
-        commit,
+        removeDynamicFields(commit),
         equalTo(
             parseCommitRequest(
                     "key: \"0000000000000002\" "
@@ -2530,7 +2541,7 @@ public class StreamingDataflowWorkerTest {
         
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
 
     assertThat(
-        commit,
+        removeDynamicFields(commit),
         equalTo(
             setMessagesMetadata(
                     PaneInfo.NO_FIRING,
@@ -2673,7 +2684,7 @@ public class StreamingDataflowWorkerTest {
                         + "source_watermark: 1000"))
             .build();
 
-    assertThat(commit, equalTo(expectedCommit));
+    assertThat(removeDynamicFields(commit), equalTo(expectedCommit));
 
     // Test retry of work item, it should return the same result and not start 
the reader from the
     // position it was left at.
@@ -2687,7 +2698,7 @@ public class StreamingDataflowWorkerTest {
         .getSourceStateUpdatesBuilder()
         .setFinalizeIds(0, commit.getSourceStateUpdates().getFinalizeIds(0));
     expectedCommit = commitBuilder.build();
-    assertThat(commit, equalTo(expectedCommit));
+    assertThat(removeDynamicFields(commit), equalTo(expectedCommit));
 
     // Continue with processing.
     server
@@ -2717,7 +2728,7 @@ public class StreamingDataflowWorkerTest {
     finalizeId = 
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
 
     assertThat(
-        commit,
+        removeDynamicFields(commit),
         equalTo(
             parseCommitRequest(
                     "key: \"0000000000000001\" "
@@ -2742,7 +2753,8 @@ public class StreamingDataflowWorkerTest {
     public MockWork(long workToken) {
       super(
           
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(),
-          Instant::now);
+          Instant::now,
+          Collections.emptyList());
     }
 
     @Override
@@ -3068,7 +3080,11 @@ public class StreamingDataflowWorkerTest {
       assertThat(
           // The commit will include a timer to clean up state - this timer is 
irrelevant
           // for the current test. Also remove source_bytes_processed because 
it's dynamic.
-          
setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
+          setValuesTimestamps(
+                  removeDynamicFields(commit)
+                      .toBuilder()
+                      .clearOutputTimers()
+                      .clearSourceBytesProcessed())
               .build(),
           equalTo(
               setMessagesMetadata(
@@ -3328,7 +3344,7 @@ public class StreamingDataflowWorkerTest {
   public void testLatencyAttributionProtobufsPopulated() throws Exception {
     FakeClock clock = new FakeClock();
     StreamingDataflowWorker.Work work =
-        new StreamingDataflowWorker.Work(null, clock) {
+        new StreamingDataflowWorker.Work(null, clock, Collections.emptyList()) 
{
           @Override
           public void run() {}
         };
@@ -3345,7 +3361,7 @@ public class StreamingDataflowWorkerTest {
     work.setState(StreamingDataflowWorker.Work.State.COMMITTING);
     clock.sleep(Duration.millis(60));
 
-    Iterator<LatencyAttribution> it = 
work.getLatencyAttributionList().iterator();
+    Iterator<LatencyAttribution> it = work.getLatencyAttributions().iterator();
     assertTrue(it.hasNext());
     LatencyAttribution lat = it.next();
     assertTrue(lat.getState() == LatencyAttribution.State.QUEUED);
@@ -3604,6 +3620,57 @@ public class StreamingDataflowWorkerTest {
             .equals(Duration.millis(1000)));
   }
 
+  @Test
+  public void testLatencyAttributionPopulatedInCommitRequest() throws 
Exception {
+    final int workToken = 7272; // A unique id makes it easier to search logs.
+
+    long dofnWaitTimeMs = 1000;
+    FakeClock clock = new FakeClock();
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(
+                new FakeSlowDoFn(clock, Duration.millis(dofnWaitTimeMs)), 0, 
StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+    StreamingDataflowWorkerOptions options = 
createTestingPipelineOptions(server);
+    options.setActiveWorkRefreshPeriodMillis(100);
+    options.setNumberOfWorkerHarnessThreads(1);
+    StreamingDataflowWorker worker =
+        makeWorker(
+            instructions,
+            options,
+            false /* publishCounters */,
+            clock,
+            clock::newFakeScheduledExecutor);
+    worker.start();
+
+    ActiveWorkRefreshSink awrSink = new 
ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
+    
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
+    server.whenGetWorkCalled().thenReturn(makeInput(workToken, 1 /* timestamp 
*/));
+    Map<Long, WorkItemCommitRequest> workItemCommitRequest = 
server.waitForAndGetCommits(1);
+
+    worker.stop();
+
+    assertEquals(
+        workItemCommitRequest.get((long) 
workToken).getPerWorkItemLatencyAttributions(0),
+        LatencyAttribution.newBuilder()
+            .setState(State.ACTIVE)
+            .setTotalDurationMillis(dofnWaitTimeMs)
+            .build());
+    if (streamingEngine) {
+      // Initial fake latency provided to FakeWindmillServer when invoke 
receiveWork in
+      // GetWorkStream().
+      assertEquals(
+          workItemCommitRequest.get((long) 
workToken).getPerWorkItemLatencyAttributions(1),
+          LatencyAttribution.newBuilder()
+              .setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
+              .setTotalDurationMillis(1000)
+              .build());
+    }
+  }
+
   /** For each input element, emits a large string. */
   private static class InflateDoFn extends DoFn<ValueWithRecordId<KV<Integer, 
Integer>>, String> {
 
@@ -3818,6 +3885,6 @@ public class StreamingDataflowWorkerTest {
         makeExpectedOutput(
                 1, TimeUnit.MILLISECONDS.toMicros(1), DEFAULT_KEY_STRING, 1, 
DEFAULT_KEY_STRING)
             .build(),
-        result.get(1L));
+        removeDynamicFields(result.get(1L)));
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
index 182c05f68cc..c9459b7d71a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,16 +39,21 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase;
+import 
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.GetWorkTimingInfosTracker;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationWorkItemMetadata;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest;
@@ -268,7 +274,8 @@ public class GrpcWindmillServerTest {
             (String computation,
                 @Nullable Instant inputDataWatermark,
                 Instant synchronizedProcessingTime,
-                Windmill.WorkItem workItem) -> {
+                Windmill.WorkItem workItem,
+                Collection<LatencyAttribution> getWorkStreamLatencies) -> {
               latch.countDown();
               assertEquals(inputDataWatermark, new Instant(18));
               assertEquals(synchronizedProcessingTime, new Instant(17));
@@ -945,7 +952,8 @@ public class GrpcWindmillServerTest {
             (String computation,
                 @Nullable Instant inputDataWatermark,
                 Instant synchronizedProcessingTime,
-                Windmill.WorkItem workItem) -> {
+                Windmill.WorkItem workItem,
+                Collection<LatencyAttribution> getWorkStreamLatencies) -> {
               latch.countDown();
             });
     // Wait for 100 items or 30 seconds.
@@ -957,4 +965,55 @@ public class GrpcWindmillServerTest {
     stream.close();
     assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
   }
+
+  @Test
+  public void testGetWorkTimingInfosTracker() throws Exception {
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 
50);
+    List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+    for (int i = 0; i <= 3; i++) {
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_START)
+              .setTimestampUsec(0)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_END)
+              .setTimestampUsec(10000)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+              .setTimestampUsec((i + 11) * 1000)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+              .setTimestampUsec((i + 16) * 1000)
+              .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
+    }
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
+    List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+    assertEquals(3, attributions.size());
+    for (LatencyAttribution attribution : attributions) {
+      latencies.put(attribution.getState(), attribution);
+    }
+    assertEquals(10L, 
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(
+        Math.min(4, (long) (elapsedTime * (10.0 / sumDurations))),
+        
latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+    assertEquals(
+        Math.min(34, (long) (elapsedTime * (130.0 / sumDurations))),
+        
latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER).getTotalDurationMillis());
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index b0e4dba698b..0cd8b8ca099 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -65,11 +65,44 @@ message LatencyAttribution {
     ACTIVE = 2;
     READING = 3;
     COMMITTING = 4;
+    // State which starts with the Windmill Worker receiving the GetWorkRequest
+    // and ends with the Windmill Worker sending the GetWorkResponse to the
+    // Windmill Dispatcher.
+    GET_WORK_IN_WINDMILL_WORKER = 5;
+    // State which starts with the Windmill Worker sending the GetWorkResponse
+    // and ends with the Windmill Dispatcher receiving the GetWorkResponse.
+    GET_WORK_IN_TRANSIT_TO_DISPATCHER = 6;
+    // State which starts with the Windmill Dispatcher sending the
+    // GetWorkResponse and ends with the user worker receiving the
+    // GetWorkResponse.
+    GET_WORK_IN_TRANSIT_TO_USER_WORKER = 7;
   }
   optional State state = 1;
   optional int64 total_duration_millis = 2;
 }
 
+message GetWorkStreamTimingInfo {
+  enum Event {
+    UNKNOWN = 0;
+    // Work item creation started by the Windmill Worker.
+    GET_WORK_CREATION_START = 1;
+    // Work item creation finished by the Windmill Worker.
+    GET_WORK_CREATION_END = 2;
+    // The GetWorkResponse containing this work item is received by the 
Windmill
+    // Dispatcher.
+    GET_WORK_RECEIVED_BY_DISPATCHER = 3;
+    // The GetWorkResponse containing this work item is forwarded by the
+    // Windmill Dispatcher to the user worker.
+    GET_WORK_FORWARDED_BY_DISPATCHER = 4;
+  }
+
+  // Critical event of the work item processing.
+  optional Event event = 1;
+
+  // Timestamp of the event.
+  optional int64 timestamp_usec = 2;
+}
+
 message OutputMessageBundle {
   optional string destination_computation_id = 1;
   optional string destination_stream_id = 3;
@@ -377,7 +410,7 @@ message GlobalDataRequest {
   optional string state_family = 3;
 }
 
-// next id: 24
+// next id: 27
 message WorkItemCommitRequest {
   required bytes key = 1;
   required fixed64 work_token = 2;
@@ -404,6 +437,9 @@ message WorkItemCommitRequest {
 
   repeated WatermarkHold watermark_holds = 14;
 
+  // Collected work item processing state durations.
+  repeated LatencyAttribution per_work_item_latency_attributions = 26;
+
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
@@ -538,6 +574,10 @@ message StreamingGetWorkResponseChunk {
   // from other stream_ids may be interleaved on the physical stream.
   optional fixed64 stream_id = 4;
 
+  // Timing infos for the work item. Windmill Dispatcher and user worker should
+  // propagate critical event timings if the list is not empty.
+  repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;
+
   // reserved field 5
 }
 

Reply via email to