scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1230751031
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +870,161 @@ public final Instant startTime() {
}
}
+ static class GetWorkTimingInfosTracker {
+
+ private Instant workItemCreationEndTime = Instant.EPOCH;
+ private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+ private LatencyAttribution workItemCreationLatency = null;
+ private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
Review Comment:
nit: have a single Map from State -> sum/max
saves lookups but also makes it clearer the invariant that key is always in
both maps
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -1441,6 +1623,11 @@ private void issueSingleRequest(final long id,
PendingRequest pendingRequest) {
.setRequestId(id)
.setShardingKey(pendingRequest.request.getShardingKey())
.setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+ if (!pendingRequest.latencyAttributions.isEmpty()) {
+ requestBuilder
+ .getCommitChunkBuilder(0)
+
.addAllPerWorkItemLatencyAttributions(pendingRequest.latencyAttributions);
Review Comment:
Let's consider adding the latency information for the work item itself to
the WorkItemCommitRequest
Then we can just have similar timing information on the
StreamingCommitWorkRequest to calculate latencies on the commit path.
Also note below you are not setting the latency information in
issueMultiChunkRequest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]