This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 30762dfc58a Populate getWorkStream latencies in dataflow streaming
worker harness (#26085)
30762dfc58a is described below
commit 30762dfc58a5d30d254787d975ff6e51f35eb570
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Jun 30 08:51:16 2023 -0700
Populate getWorkStream latencies in dataflow streaming worker harness
(#26085)
---
.../dataflow/worker/StreamingDataflowWorker.java | 55 +++++--
.../worker/windmill/GrpcWindmillServer.java | 169 ++++++++++++++++++++-
.../worker/windmill/WindmillServerStub.java | 7 +-
.../dataflow/worker/FakeWindmillServer.java | 13 +-
.../worker/StreamingDataflowWorkerTest.java | 121 +++++++++++----
.../worker/windmill/GrpcWindmillServerTest.java | 63 +++++++-
.../worker/windmill/src/main/proto/windmill.proto | 42 ++++-
7 files changed, 425 insertions(+), 45 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 92d0710bbe7..ea5065260a9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -103,6 +103,7 @@ import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter
import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.CommitWorkStream;
@@ -987,7 +988,11 @@ public class StreamingDataflowWorker {
computationWork.getDependentRealtimeInputWatermark());
for (final Windmill.WorkItem workItem : computationWork.getWorkList())
{
scheduleWorkItem(
- computationState, inputDataWatermark,
synchronizedProcessingTime, workItem);
+ computationState,
+ inputDataWatermark,
+ synchronizedProcessingTime,
+ workItem,
+ /*getWorkStreamLatencies=*/ Collections.emptyList());
}
}
}
@@ -1005,13 +1010,15 @@ public class StreamingDataflowWorker {
(String computation,
Instant inputDataWatermark,
Instant synchronizedProcessingTime,
- Windmill.WorkItem workItem) -> {
+ Windmill.WorkItem workItem,
+ Collection<LatencyAttribution> getWorkStreamLatencies) -> {
memoryMonitor.waitForResources("GetWork");
scheduleWorkItem(
getComputationState(computation),
inputDataWatermark,
synchronizedProcessingTime,
- workItem);
+ workItem,
+ getWorkStreamLatencies);
});
try {
// Reconnect every now and again to enable better load balancing.
@@ -1030,7 +1037,8 @@ public class StreamingDataflowWorker {
final ComputationState computationState,
final Instant inputDataWatermark,
final Instant synchronizedProcessingTime,
- final Windmill.WorkItem workItem) {
+ final Windmill.WorkItem workItem,
+ final Collection<LatencyAttribution> getWorkStreamLatencies) {
Preconditions.checkNotNull(inputDataWatermark);
// May be null if output watermark not yet known.
final @Nullable Instant outputDataWatermark =
@@ -1038,7 +1046,7 @@ public class StreamingDataflowWorker {
Preconditions.checkState(
outputDataWatermark == null ||
!outputDataWatermark.isAfter(inputDataWatermark));
Work work =
- new Work(workItem, clock) {
+ new Work(workItem, clock, getWorkStreamLatencies) {
@Override
public void run() {
process(
@@ -1081,7 +1089,12 @@ public class StreamingDataflowWorker {
PROCESSING(Windmill.LatencyAttribution.State.ACTIVE),
READING(Windmill.LatencyAttribution.State.READING),
COMMIT_QUEUED(Windmill.LatencyAttribution.State.COMMITTING),
- COMMITTING(Windmill.LatencyAttribution.State.COMMITTING);
+ COMMITTING(Windmill.LatencyAttribution.State.COMMITTING),
+
GET_WORK_IN_WINDMILL_WORKER(Windmill.LatencyAttribution.State.GET_WORK_IN_WINDMILL_WORKER),
+ GET_WORK_IN_TRANSIT_TO_DISPATCHER(
+ Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_DISPATCHER),
+ GET_WORK_IN_TRANSIT_TO_USER_WORKER(
+
Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_USER_WORKER);
private final Windmill.LatencyAttribution.State latencyAttributionState;
@@ -1099,14 +1112,18 @@ public class StreamingDataflowWorker {
private final Instant startTime;
private Instant stateStartTime;
private State state;
- private Map<Windmill.LatencyAttribution.State, Duration>
totalDurationPerState;
+ private final Map<Windmill.LatencyAttribution.State, Duration>
totalDurationPerState =
+ new EnumMap<>(Windmill.LatencyAttribution.State.class);
- public Work(Windmill.WorkItem workItem, Supplier<Instant> clock) {
+ public Work(
+ Windmill.WorkItem workItem,
+ Supplier<Instant> clock,
+ Collection<LatencyAttribution> getWorkStreamLatencies) {
this.workItem = workItem;
this.clock = clock;
this.startTime = this.stateStartTime = clock.get();
this.state = State.QUEUED;
- this.totalDurationPerState = new
EnumMap<>(Windmill.LatencyAttribution.State.class);
+ recordGetWorkStreamLatencies(getWorkStreamLatencies);
}
public Windmill.WorkItem getWorkItem() {
@@ -1134,7 +1151,15 @@ public class StreamingDataflowWorker {
return stateStartTime;
}
- public Iterable<Windmill.LatencyAttribution> getLatencyAttributionList() {
+ private void recordGetWorkStreamLatencies(
+ Collection<LatencyAttribution> getWorkStreamLatencies) {
+ for (LatencyAttribution latency : getWorkStreamLatencies) {
+ totalDurationPerState.put(
+ latency.getState(),
Duration.millis(latency.getTotalDurationMillis()));
+ }
+ }
+
+ public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
List<Windmill.LatencyAttribution> list = new ArrayList<>();
for (Windmill.LatencyAttribution.State state :
Windmill.LatencyAttribution.State.values()) {
Duration duration = totalDurationPerState.getOrDefault(state,
Duration.ZERO);
@@ -1431,6 +1456,7 @@ public class StreamingDataflowWorker {
// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
+
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions());
WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
@@ -1454,7 +1480,12 @@ public class StreamingDataflowWorker {
commitQueue.put(new Commit(commitRequest, computationState, work));
// Compute shuffle and state byte statistics these will be flushed
asynchronously.
- long stateBytesWritten =
outputBuilder.clearOutputMessages().build().getSerializedSize();
+ long stateBytesWritten =
+ outputBuilder
+ .clearOutputMessages()
+ .clearPerWorkItemLatencyAttributions()
+ .build()
+ .getSerializedSize();
long shuffleBytesRead = 0;
for (Windmill.InputMessageBundle bundle :
workItem.getMessageBundlesList()) {
for (Windmill.Message message : bundle.getMessagesList()) {
@@ -2291,7 +2322,7 @@ public class StreamingDataflowWorker {
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
-
.addAllLatencyAttribution(work.getLatencyAttributionList())
+ .addAllLatencyAttribution(work.getLatencyAttributions())
.build());
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 7fe1a7b5440..9dcae93c8d1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -24,8 +24,10 @@ import java.io.PrintWriter;
import java.io.SequenceInputStream;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
+import java.util.EnumMap;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,11 +66,15 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
@@ -105,6 +111,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -133,6 +140,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
private static final int GET_DATA_STREAM_CHUNK_SIZE = 2 << 20;
private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;
+
private static final AtomicLong nextId = new AtomicLong(0);
private final StreamingDataflowWorkerOptions options;
@@ -862,6 +870,155 @@ public class GrpcWindmillServer extends
WindmillServerStub {
}
}
+ static class GetWorkTimingInfosTracker {
+ private static class SumAndMaxDurations {
+ private Duration sum;
+ private Duration max;
+
+ public SumAndMaxDurations(Duration sum, Duration max) {
+ this.sum = sum;
+ this.max = max;
+ }
+ }
+
+ private Instant workItemCreationEndTime = Instant.EPOCH;
+ private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+ private LatencyAttribution workItemCreationLatency = null;
+ private final Map<State, SumAndMaxDurations>
aggregatedGetWorkStreamLatencies;
+
+ private final MillisProvider clock;
+
+ public GetWorkTimingInfosTracker(MillisProvider clock) {
+ this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+ this.clock = clock;
+ }
+
+ public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+ // We want to record duration for each stage and also be reflective on
total work item
+ // processing time. It can be tricky because timings of different
+ // StreamingGetWorkResponseChunks can be interleaved. Current strategy
is to record the
+ // sum duration in each transmission stage across different chunks, then
divide the total
+ // duration (start from the chunk creation end in the windmill worker to
the end of last chunk
+ // reception by the user worker) proportionally according the sum
duration values across the
+ // many stages, the final latency is also capped by the corresponding
stage maximum latency
+ // seen across multiple chunks. This should allow us to identify the
slow stage meanwhile
+ // avoid confusions for comparing the stage duration to the total
processing elapsed wall
+ // time.
+ Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+ for (GetWorkStreamTimingInfo info : infos) {
+ getWorkStreamTimings.putIfAbsent(
+ info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() /
1000));
+ }
+
+ // Record the difference between starting to get work and the first
chunk being sent as the
+ // work creation time.
+ Instant workItemCreationStart =
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+ Instant workItemCreationEnd =
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+ if (workItemCreationStart != null
+ && workItemCreationEnd != null
+ && workItemCreationLatency == null) {
+ workItemCreationLatency =
+ LatencyAttribution.newBuilder()
+ .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+ .setTotalDurationMillis(
+ new Duration(workItemCreationStart,
workItemCreationEnd).getMillis())
+ .build();
+ }
+ // Record the work item creation end time as the start of transmission
stages.
+ if (workItemCreationEnd != null &&
workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+ workItemCreationEndTime = workItemCreationEnd;
+ }
+
+ // Record the latency of each chunk between send on worker and arrival
on dispatcher.
+ Instant receivedByDispatcherTiming =
+ getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+ if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {
+ Duration newDuration = new Duration(workItemCreationEnd,
receivedByDispatcherTiming);
+ aggregatedGetWorkStreamLatencies.compute(
+ State.GET_WORK_IN_TRANSIT_TO_DISPATCHER,
+ (stateKey, duration) -> {
+ if (duration == null) {
+ return new SumAndMaxDurations(newDuration, newDuration);
+ }
+ duration.max = newDuration.isLongerThan(duration.max) ?
newDuration : duration.max;
+ duration.sum = duration.sum.plus(newDuration);
+ return duration;
+ });
+ }
+
+ // Record the latency of each chunk between send on dispatcher and
arrival on worker.
+ Instant forwardedByDispatcherTiming =
+ getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
+ Instant now = Instant.ofEpochMilli(clock.getMillis());
+ if (forwardedByDispatcherTiming != null) {
+ Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
+ aggregatedGetWorkStreamLatencies.compute(
+ State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+ (stateKey, duration) -> {
+ if (duration == null) {
+ return new SumAndMaxDurations(newDuration, newDuration);
+ }
+ duration.max = newDuration.isLongerThan(duration.max) ?
newDuration : duration.max;
+ duration.sum = duration.sum.plus(newDuration);
+ return duration;
+ });
+ }
+ workItemLastChunkReceivedByWorkerTime = now;
+ }
+
+ List<LatencyAttribution> getLatencyAttributions() {
+ if (workItemCreationLatency == null &&
aggregatedGetWorkStreamLatencies.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<LatencyAttribution> latencyAttributions =
+ new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+ if (workItemCreationLatency != null) {
+ latencyAttributions.add(workItemCreationLatency);
+ }
+ if
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+ LOG.warn(
+ "Work item creation time {} is after the work received time {}, "
+ + "one or more GetWorkStream timing infos are missing.",
+ workItemCreationEndTime,
+ workItemLastChunkReceivedByWorkerTime);
+ return latencyAttributions;
+ }
+ long totalTransmissionDurationElapsedTime =
+ new Duration(workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime).getMillis();
+ long totalSumDurationTimeMills = 0;
+ for (SumAndMaxDurations duration :
aggregatedGetWorkStreamLatencies.values()) {
+ totalSumDurationTimeMills += duration.sum.getMillis();
+ }
+ final long finalTotalSumDurationTimeMills = totalSumDurationTimeMills;
+
+ aggregatedGetWorkStreamLatencies.forEach(
+ (state, duration) -> {
+ long scaledDuration =
+ (long)
+ (((double) duration.sum.getMillis() /
finalTotalSumDurationTimeMills)
+ * totalTransmissionDurationElapsedTime);
+ // Cap final duration by the max state duration across different
chunks. This ensures
+ // the sum of final durations does not exceed the total elapsed
time and the duration
+ // for each stage does not exceed the stage maximum.
+ long durationMills = Math.min(duration.max.getMillis(),
scaledDuration);
+ latencyAttributions.add(
+ LatencyAttribution.newBuilder()
+ .setState(state)
+ .setTotalDurationMillis(durationMills)
+ .build());
+ });
+ return latencyAttributions;
+ }
+
+ public void reset() {
+ this.aggregatedGetWorkStreamLatencies.clear();
+ this.workItemCreationEndTime = Instant.EPOCH;
+ this.workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+ this.workItemCreationLatency = null;
+ }
+ }
+
private class GrpcGetWorkStream
extends AbstractWindmillStream<StreamingGetWorkRequest,
StreamingGetWorkResponseChunk>
implements GetWorkStream {
@@ -965,12 +1122,16 @@ public class GrpcWindmillServer extends
WindmillServerStub {
}
private class WorkItemBuffer {
+
private String computation;
private Instant inputDataWatermark;
private Instant synchronizedProcessingTime;
private ByteString data = ByteString.EMPTY;
private long bufferedSize = 0;
+ private GetWorkTimingInfosTracker workTimingInfosTracker =
+ new GetWorkTimingInfosTracker(System::currentTimeMillis);
+
private void setMetadata(Windmill.ComputationWorkItemMetadata metadata) {
this.computation = metadata.getComputationId();
this.inputDataWatermark =
@@ -987,6 +1148,7 @@ public class GrpcWindmillServer extends WindmillServerStub
{
this.data = data.concat(chunk.getSerializedWorkItem());
this.bufferedSize += chunk.getSerializedWorkItem().size();
+
workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList());
}
public long bufferedSize() {
@@ -995,14 +1157,19 @@ public class GrpcWindmillServer extends
WindmillServerStub {
public void runAndReset() {
try {
+ Windmill.WorkItem workItem =
Windmill.WorkItem.parseFrom(data.newInput());
+ List<LatencyAttribution> getWorkStreamLatencies =
+ workTimingInfosTracker.getLatencyAttributions();
receiver.receiveWork(
computation,
inputDataWatermark,
synchronizedProcessingTime,
- Windmill.WorkItem.parseFrom(data.newInput()));
+ workItem,
+ getWorkStreamLatencies);
} catch (IOException e) {
LOG.error("Failed to parse work item from stream: ", e);
}
+ workTimingInfosTracker.reset();
data = ByteString.EMPTY;
bufferedSize = 0;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index 238f22aa643..be6c365ee33 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker.windmill;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import
org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -72,11 +74,13 @@ public abstract class WindmillServerStub implements
StatusDataProvider {
/** Functional interface for receiving WorkItems. */
@FunctionalInterface
public interface WorkItemReceiver {
+
void receiveWork(
String computation,
@Nullable Instant inputDataWatermark,
Instant synchronizedProcessingTime,
- Windmill.WorkItem workItem);
+ Windmill.WorkItem workItem,
+ Collection<LatencyAttribution> getWorkStreamLatencies);
}
/**
@@ -133,6 +137,7 @@ public abstract class WindmillServerStub implements
StatusDataProvider {
/** Interface for streaming CommitWorkRequests to Windmill. */
@ThreadSafe
public interface CommitWorkStream extends WindmillStream {
+
/**
* Commits a work item and running onDone when the commit has been
processed by the server.
* Returns true if the request was accepted. If false is returned the
stream should be flushed
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index b0a64c82d03..edf8ce4628c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -48,6 +49,8 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetD
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
@@ -273,7 +276,15 @@ class FakeWindmillServer extends WindmillServerStub {
computationWork.getInputDataWatermark());
for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
receiver.receiveWork(
- computationWork.getComputationId(), inputDataWatermark,
Instant.now(), workItem);
+ computationWork.getComputationId(),
+ inputDataWatermark,
+ Instant.now(),
+ workItem,
+ Collections.singletonList(
+ LatencyAttribution.newBuilder()
+ .setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
+ .setTotalDurationMillis(1000)
+ .build()));
}
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index d52335698e5..0e53210c018 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -110,6 +110,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataReq
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedMessageBundle;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold;
@@ -600,6 +601,11 @@ public class StreamingDataflowWorkerTest {
parseCommitRequest(expectedCommitRequestBuilder.toString()));
}
+ private WorkItemCommitRequest removeDynamicFields(WorkItemCommitRequest
request) {
+ // Throw away per_work_item_attribution because it is dynamic in tests.
+ return request.toBuilder().clearPerWorkItemLatencyAttributions().build();
+ }
+
private WorkItemCommitRequest.Builder makeExpectedTruncationRequestOutput(
int index, String key, long shardingKey, long estimatedSize) throws
Exception {
StringBuilder expectedCommitRequestBuilder =
@@ -759,7 +765,8 @@ public class StreamingDataflowWorkerTest {
for (int i = 0; i < numIters; ++i) {
assertTrue(result.containsKey((long) i));
assertEquals(
- makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
result.get((long) i));
+ makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+ removeDynamicFields(result.get((long) i)));
}
verify(hotKeyLogger,
atLeastOnce()).logHotKeyDetection(nullable(String.class), any());
@@ -798,7 +805,8 @@ public class StreamingDataflowWorkerTest {
for (int i = 0; i < numIters; ++i) {
assertTrue(result.containsKey((long) i));
assertEquals(
- makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
result.get((long) i));
+ makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+ removeDynamicFields(result.get((long) i)));
}
verify(hotKeyLogger,
atLeastOnce()).logHotKeyDetection(nullable(String.class), any());
@@ -979,7 +987,8 @@ public class StreamingDataflowWorkerTest {
for (int i = 0; i < numIters; ++i) {
assertTrue(result.containsKey((long) i));
assertEquals(
- makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
result.get((long) i));
+ makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(),
+ removeDynamicFields(result.get((long) i)));
assertTrue(result.containsKey((long) i + 1000));
assertEquals(
makeExpectedOutput(
@@ -989,7 +998,7 @@ public class StreamingDataflowWorkerTest {
DEFAULT_SHARDING_KEY + 1,
keyStringForIndex(i))
.build(),
- result.get((long) i + 1000));
+ removeDynamicFields(result.get((long) i + 1000)));
assertTrue(result.containsKey((long) i + numIters));
assertEquals(
makeExpectedOutput(
@@ -999,7 +1008,7 @@ public class StreamingDataflowWorkerTest {
DEFAULT_SHARDING_KEY,
keyStringForIndex(i))
.build(),
- result.get((long) i + numIters));
+ removeDynamicFields(result.get((long) i + numIters)));
}
// Re-add the work, it should process due to the keys no longer being
active.
@@ -1025,7 +1034,7 @@ public class StreamingDataflowWorkerTest {
DEFAULT_SHARDING_KEY,
keyStringForIndex(i))
.build(),
- result.get((long) i + numIters * 2));
+ removeDynamicFields(result.get((long) i + numIters * 2)));
}
}
@@ -1117,7 +1126,7 @@ public class StreamingDataflowWorkerTest {
assertEquals(
makeExpectedOutput(1, 0, DEFAULT_KEY_STRING, DEFAULT_SHARDING_KEY,
DEFAULT_KEY_STRING)
.build(),
- result.get(1L));
+ removeDynamicFields(result.get(1L)));
assertEquals(1, result.size());
}
@@ -1165,7 +1174,8 @@ public class StreamingDataflowWorkerTest {
assertEquals(2, result.size());
assertEquals(
- makeExpectedOutput(2, 0, "key", DEFAULT_SHARDING_KEY, "key").build(),
result.get(2L));
+ makeExpectedOutput(2, 0, "key", DEFAULT_SHARDING_KEY, "key").build(),
+ removeDynamicFields(result.get(2L)));
assertTrue(result.containsKey(1L));
WorkItemCommitRequest largeCommit = result.get(1L);
@@ -1254,7 +1264,7 @@ public class StreamingDataflowWorkerTest {
DEFAULT_SHARDING_KEY,
keyStringForIndex(i) + "_data" + i)
.build(),
- result.get((long) i));
+ removeDynamicFields(result.get((long) i)));
assertTrue(result.containsKey((long) i + 1000));
assertEquals(
makeExpectedOutput(
@@ -1264,7 +1274,7 @@ public class StreamingDataflowWorkerTest {
DEFAULT_SHARDING_KEY + i,
keyStringForIndex(i) + "_data" + (i + 1000))
.build(),
- result.get((long) i + 1000));
+ removeDynamicFields(result.get((long) i + 1000)));
}
}
@@ -1444,7 +1454,7 @@ public class StreamingDataflowWorkerTest {
Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(2);
assertThat(
- result.get((long) timestamp1),
+ removeDynamicFields(result.get((long) timestamp1)),
equalTo(
setMessagesMetadata(
PaneInfo.NO_FIRING,
@@ -1453,7 +1463,7 @@ public class StreamingDataflowWorkerTest {
.build()));
assertThat(
- result.get((long) timestamp2),
+ removeDynamicFields(result.get((long) timestamp2)),
equalTo(
setMessagesMetadata(
PaneInfo.NO_FIRING,
@@ -1654,6 +1664,7 @@ public class StreamingDataflowWorkerTest {
Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
.clearCounterUpdates()
.clearOutputMessages()
+ .clearPerWorkItemLatencyAttributions()
.build()
.getSerializedSize(),
splitIntToLong(getCounter(counters,
"WindmillStateBytesWritten").getInteger()));
@@ -1776,7 +1787,7 @@ public class StreamingDataflowWorkerTest {
splitIntToLong(getCounter(counters,
"WindmillStateBytesRead").getInteger()));
// State updates to clear state
assertEquals(
- Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
.clearCounterUpdates()
.clearOutputMessages()
.build()
@@ -1949,7 +1960,7 @@ public class StreamingDataflowWorkerTest {
assertEquals(0L, splitIntToLong(getCounter(counters,
"WindmillStateBytesRead").getInteger()));
// Timer + buffer + watermark hold
assertEquals(
- Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
.clearCounterUpdates()
.clearOutputMessages()
.build()
@@ -2075,7 +2086,7 @@ public class StreamingDataflowWorkerTest {
splitIntToLong(getCounter(counters,
"WindmillStateBytesRead").getInteger()));
// State updates to clear state
assertEquals(
- Windmill.WorkItemCommitRequest.newBuilder(actualOutput)
+
Windmill.WorkItemCommitRequest.newBuilder(removeDynamicFields(actualOutput))
.clearCounterUpdates()
.clearOutputMessages()
.build()
@@ -2363,7 +2374,7 @@ public class StreamingDataflowWorkerTest {
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
assertThat(
- commit,
+ removeDynamicFields(commit),
equalTo(
setMessagesMetadata(
PaneInfo.NO_FIRING,
@@ -2425,7 +2436,7 @@ public class StreamingDataflowWorkerTest {
finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
assertThat(
- commit,
+ removeDynamicFields(commit),
equalTo(
parseCommitRequest(
"key: \"0000000000000001\" "
@@ -2473,7 +2484,7 @@ public class StreamingDataflowWorkerTest {
finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
assertThat(
- commit,
+ removeDynamicFields(commit),
equalTo(
parseCommitRequest(
"key: \"0000000000000002\" "
@@ -2530,7 +2541,7 @@ public class StreamingDataflowWorkerTest {
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
assertThat(
- commit,
+ removeDynamicFields(commit),
equalTo(
setMessagesMetadata(
PaneInfo.NO_FIRING,
@@ -2673,7 +2684,7 @@ public class StreamingDataflowWorkerTest {
+ "source_watermark: 1000"))
.build();
- assertThat(commit, equalTo(expectedCommit));
+ assertThat(removeDynamicFields(commit), equalTo(expectedCommit));
// Test retry of work item, it should return the same result and not start
the reader from the
// position it was left at.
@@ -2687,7 +2698,7 @@ public class StreamingDataflowWorkerTest {
.getSourceStateUpdatesBuilder()
.setFinalizeIds(0, commit.getSourceStateUpdates().getFinalizeIds(0));
expectedCommit = commitBuilder.build();
- assertThat(commit, equalTo(expectedCommit));
+ assertThat(removeDynamicFields(commit), equalTo(expectedCommit));
// Continue with processing.
server
@@ -2717,7 +2728,7 @@ public class StreamingDataflowWorkerTest {
finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
assertThat(
- commit,
+ removeDynamicFields(commit),
equalTo(
parseCommitRequest(
"key: \"0000000000000001\" "
@@ -2742,7 +2753,8 @@ public class StreamingDataflowWorkerTest {
public MockWork(long workToken) {
super(
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(),
- Instant::now);
+ Instant::now,
+ Collections.emptyList());
}
@Override
@@ -3068,7 +3080,11 @@ public class StreamingDataflowWorkerTest {
assertThat(
// The commit will include a timer to clean up state - this timer is
irrelevant
// for the current test. Also remove source_bytes_processed because
it's dynamic.
-
setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
+ setValuesTimestamps(
+ removeDynamicFields(commit)
+ .toBuilder()
+ .clearOutputTimers()
+ .clearSourceBytesProcessed())
.build(),
equalTo(
setMessagesMetadata(
@@ -3328,7 +3344,7 @@ public class StreamingDataflowWorkerTest {
public void testLatencyAttributionProtobufsPopulated() throws Exception {
FakeClock clock = new FakeClock();
StreamingDataflowWorker.Work work =
- new StreamingDataflowWorker.Work(null, clock) {
+ new StreamingDataflowWorker.Work(null, clock, Collections.emptyList())
{
@Override
public void run() {}
};
@@ -3345,7 +3361,7 @@ public class StreamingDataflowWorkerTest {
work.setState(StreamingDataflowWorker.Work.State.COMMITTING);
clock.sleep(Duration.millis(60));
- Iterator<LatencyAttribution> it =
work.getLatencyAttributionList().iterator();
+ Iterator<LatencyAttribution> it = work.getLatencyAttributions().iterator();
assertTrue(it.hasNext());
LatencyAttribution lat = it.next();
assertTrue(lat.getState() == LatencyAttribution.State.QUEUED);
@@ -3604,6 +3620,57 @@ public class StreamingDataflowWorkerTest {
.equals(Duration.millis(1000)));
}
+ @Test
+ public void testLatencyAttributionPopulatedInCommitRequest() throws
Exception {
+ final int workToken = 7272; // A unique id makes it easier to search logs.
+
+ long dofnWaitTimeMs = 1000;
+ FakeClock clock = new FakeClock();
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(StringUtf8Coder.of()),
+ makeDoFnInstruction(
+ new FakeSlowDoFn(clock, Duration.millis(dofnWaitTimeMs)), 0,
StringUtf8Coder.of()),
+ makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+ FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+ StreamingDataflowWorkerOptions options =
createTestingPipelineOptions(server);
+ options.setActiveWorkRefreshPeriodMillis(100);
+ options.setNumberOfWorkerHarnessThreads(1);
+ StreamingDataflowWorker worker =
+ makeWorker(
+ instructions,
+ options,
+ false /* publishCounters */,
+ clock,
+ clock::newFakeScheduledExecutor);
+ worker.start();
+
+ ActiveWorkRefreshSink awrSink = new
ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
+
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
+ server.whenGetWorkCalled().thenReturn(makeInput(workToken, 1 /* timestamp
*/));
+ Map<Long, WorkItemCommitRequest> workItemCommitRequest =
server.waitForAndGetCommits(1);
+
+ worker.stop();
+
+ assertEquals(
+ workItemCommitRequest.get((long)
workToken).getPerWorkItemLatencyAttributions(0),
+ LatencyAttribution.newBuilder()
+ .setState(State.ACTIVE)
+ .setTotalDurationMillis(dofnWaitTimeMs)
+ .build());
+ if (streamingEngine) {
+ // Initial fake latency provided to FakeWindmillServer when invoke
receiveWork in
+ // GetWorkStream().
+ assertEquals(
+ workItemCommitRequest.get((long)
workToken).getPerWorkItemLatencyAttributions(1),
+ LatencyAttribution.newBuilder()
+ .setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
+ .setTotalDurationMillis(1000)
+ .build());
+ }
+ }
+
/** For each input element, emits a large string. */
private static class InflateDoFn extends DoFn<ValueWithRecordId<KV<Integer,
Integer>>, String> {
@@ -3818,6 +3885,6 @@ public class StreamingDataflowWorkerTest {
makeExpectedOutput(
1, TimeUnit.MILLISECONDS.toMicros(1), DEFAULT_KEY_STRING, 1,
DEFAULT_KEY_STRING)
.build(),
- result.get(1L));
+ removeDynamicFields(result.get(1L)));
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
index 182c05f68cc..c9459b7d71a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,16 +39,21 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase;
+import
org.apache.beam.runners.dataflow.worker.windmill.GrpcWindmillServer.GetWorkTimingInfosTracker;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationWorkItemMetadata;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest;
@@ -268,7 +274,8 @@ public class GrpcWindmillServerTest {
(String computation,
@Nullable Instant inputDataWatermark,
Instant synchronizedProcessingTime,
- Windmill.WorkItem workItem) -> {
+ Windmill.WorkItem workItem,
+ Collection<LatencyAttribution> getWorkStreamLatencies) -> {
latch.countDown();
assertEquals(inputDataWatermark, new Instant(18));
assertEquals(synchronizedProcessingTime, new Instant(17));
@@ -945,7 +952,8 @@ public class GrpcWindmillServerTest {
(String computation,
@Nullable Instant inputDataWatermark,
Instant synchronizedProcessingTime,
- Windmill.WorkItem workItem) -> {
+ Windmill.WorkItem workItem,
+ Collection<LatencyAttribution> getWorkStreamLatencies) -> {
latch.countDown();
});
// Wait for 100 items or 30 seconds.
@@ -957,4 +965,55 @@ public class GrpcWindmillServerTest {
stream.close();
assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testGetWorkTimingInfosTracker() throws Exception {
+ GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() ->
50);
+ List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+ for (int i = 0; i <= 3; i++) {
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_CREATION_START)
+ .setTimestampUsec(0)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_CREATION_END)
+ .setTimestampUsec(10000)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+ .setTimestampUsec((i + 11) * 1000)
+ .build());
+ infos.add(
+ GetWorkStreamTimingInfo.newBuilder()
+ .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+ .setTimestampUsec((i + 16) * 1000)
+ .build());
+ tracker.addTimingInfo(infos);
+ infos.clear();
+ }
+ // durations for each chunk:
+ // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+ // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+ // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+ Map<State, LatencyAttribution> latencies = new HashMap<>();
+ List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+ assertEquals(3, attributions.size());
+ for (LatencyAttribution attribution : attributions) {
+ latencies.put(attribution.getState(), attribution);
+ }
+ assertEquals(10L,
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+ // elapsed time from 10 -> 50;
+ long elapsedTime = 40;
+ // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+ long sumDurations = 140;
+ assertEquals(
+ Math.min(4, (long) (elapsedTime * (10.0 / sumDurations))),
+
latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+ assertEquals(
+ Math.min(34, (long) (elapsedTime * (130.0 / sumDurations))),
+
latencies.get(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER).getTotalDurationMillis());
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index b0e4dba698b..0cd8b8ca099 100644
---
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -65,11 +65,44 @@ message LatencyAttribution {
ACTIVE = 2;
READING = 3;
COMMITTING = 4;
+ // State which starts with the Windmill Worker receiving the GetWorkRequest
+ // and ends with the Windmill Worker sending the GetWorkResponse to the
+ // Windmill Dispatcher.
+ GET_WORK_IN_WINDMILL_WORKER = 5;
+ // State which starts with the Windmill Worker sending the GetWorkResponse
+ // and ends with the Windmill Dispatcher receiving the GetWorkResponse.
+ GET_WORK_IN_TRANSIT_TO_DISPATCHER = 6;
+ // State which starts with the Windmill Dispatcher sending the
+ // GetWorkResponse and ends with the user worker receiving the
+ // GetWorkResponse.
+ GET_WORK_IN_TRANSIT_TO_USER_WORKER = 7;
}
optional State state = 1;
optional int64 total_duration_millis = 2;
}
+message GetWorkStreamTimingInfo {
+ enum Event {
+ UNKNOWN = 0;
+ // Work item creation started by the Windmill Worker.
+ GET_WORK_CREATION_START = 1;
+ // Work item creation finished by the Windmill Worker.
+ GET_WORK_CREATION_END = 2;
+ // The GetWorkResponse containing this work item is received by the
Windmill
+ // Dispatcher.
+ GET_WORK_RECEIVED_BY_DISPATCHER = 3;
+ // The GetWorkResponse containing this work item is forwarded by the
+ // Windmill Dispatcher to the user worker.
+ GET_WORK_FORWARDED_BY_DISPATCHER = 4;
+ }
+
+ // Critical event of the work item processing.
+ optional Event event = 1;
+
+ // Timestamp of the event.
+ optional int64 timestamp_usec = 2;
+}
+
message OutputMessageBundle {
optional string destination_computation_id = 1;
optional string destination_stream_id = 3;
@@ -377,7 +410,7 @@ message GlobalDataRequest {
optional string state_family = 3;
}
-// next id: 24
+// next id: 27
message WorkItemCommitRequest {
required bytes key = 1;
required fixed64 work_token = 2;
@@ -404,6 +437,9 @@ message WorkItemCommitRequest {
repeated WatermarkHold watermark_holds = 14;
+ // Collected work item processing state durations.
+ repeated LatencyAttribution per_work_item_latency_attributions = 26;
+
// DEPRECATED
repeated GlobalDataId global_data_id_requests = 9;
@@ -538,6 +574,10 @@ message StreamingGetWorkResponseChunk {
// from other stream_ids may be interleaved on the physical stream.
optional fixed64 stream_id = 4;
+ // Timing infos for the work item. Windmill Dispatcher and user worker should
+ // propagate critical event timings if the list is not empty.
+ repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;
+
// reserved field 5
}