y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1223629182
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
}
static class GetWorkTimingInfosTracker {
- private final Map<State, Duration> getWorkStreamLatencies;
- private Instant workItemCreationStartTime =
Instant.ofEpochMilli(Long.MAX_VALUE);
+
+ private Instant workItemCreationEndTime = Instant.EPOCH;
private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
- public GetWorkTimingInfosTracker() {
- this.getWorkStreamLatencies = new EnumMap<>(State.class);
+ private LatencyAttribution workItemCreationLatency = null;
+ private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+ private final MillisProvider clock;
+
+ public GetWorkTimingInfosTracker(MillisProvider clock) {
+ this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+ this.clock = clock;
}
public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
// We want to record duration for each stage and also be reflective on
total work item
// processing time. It can be tricky because timings of different
// StreamingGetWorkResponseChunks can be interleaved. Current strategy
is to record the
- // sum duration in each stage across different chunks, then divide the
total duration (start
- // from the first chunk creation in the windmill worker to the end of
last chunk reception by
- // the user worker) proportionally according the sum duration values
across the many stages.
- // This should allow us to identify the slow stage meanwhile avoid
confusions for comparing
- // the stage duration to the total processing elapsed wall time.
+ // sum duration in each transmission stage across different chunks, then
divide the total
+ // duration (start from the chunk creation end in the windmill worker to
the end of last chunk
+ // reception by the user worker) proportionally according the sum
duration values across the
+ // many stages. This should allow us to identify the slow stage
meanwhile avoid confusions for
+ // comparing the stage duration to the total processing elapsed wall
time.
Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
for (GetWorkStreamTimingInfo info : infos) {
getWorkStreamTimings.putIfAbsent(
info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() /
1000));
}
-
- for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
- Event start = cell.getRowKey();
- Event end = cell.getColumnKey();
- State state = cell.getValue();
- Instant startTiming = getWorkStreamTimings.get(start);
- Instant endTiming = getWorkStreamTimings.get(end);
- if (startTiming != null && endTiming != null) {
- getWorkStreamLatencies.compute(
- state,
- (state_key, duration) -> {
- Duration newDuration = new Duration(startTiming, endTiming);
- if (duration == null) {
- return newDuration;
- }
- return duration.plus(newDuration);
- });
- }
+ Instant workItemCreationStart =
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+ Instant workItemCreationEnd =
getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+ // Record the work item creation end time.
+ if (workItemCreationStart != null
+ && workItemCreationEnd != null
+ && workItemCreationLatency == null) {
+ workItemCreationLatency =
+ LatencyAttribution.newBuilder()
+ .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+ .setTotalDurationMillis(
+ new Duration(workItemCreationStart,
workItemCreationEnd).getMillis())
+ .build();
}
- Instant getWorkCreationStartTime =
getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
- if (getWorkCreationStartTime != null
- && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
- workItemCreationStartTime = getWorkCreationStartTime;
+ if (workItemCreationEnd != null &&
workItemCreationEnd.isAfter(workItemCreationEndTime)) {
Review Comment:
workItemCreationEndTime is always initialized as Instant.EPOCH.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void
addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
}
List<LatencyAttribution> getLatencyAttributions() {
- if (getWorkStreamLatencies.size() == 0) {
- return new ArrayList<>();
+ if (workItemCreationLatency == null &&
aggregatedGetWorkStreamLatencies.size() == 0) {
+ return Collections.emptyList();
+ }
+ List<LatencyAttribution> latencyAttributions =
+ new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+ if (workItemCreationLatency != null) {
+ latencyAttributions.add(workItemCreationLatency);
}
- if
(workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+ if
(workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
LOG.warn(
- String.format(
- "Work item creation time %s is after the work received time
%s, "
- + "one or more GetWorkStream timing infos are missing.",
- workItemCreationStartTime,
workItemLastChunkReceivedByWorkerTime));
- return new ArrayList<>();
- }
- List<LatencyAttribution> latencyAttributions = new
ArrayList<>(getWorkStreamLatencies.size());
- long totalDurationWallTimeMills =
- new Duration(workItemCreationStartTime,
workItemLastChunkReceivedByWorkerTime)
- .getMillis();
+ "Work item creation time {} is after the work received time {}, "
+ + "one or more GetWorkStream timing infos are missing.",
+ workItemCreationEndTime,
+ workItemLastChunkReceivedByWorkerTime);
+ return latencyAttributions;
+ }
+ long totalTransmissionDurationElapsedTime =
+ new Duration(workItemCreationEndTime,
workItemLastChunkReceivedByWorkerTime).getMillis();
long totalSumDurationTimeMills = 0;
- for (Duration duration : getWorkStreamLatencies.values()) {
+ for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
totalSumDurationTimeMills += duration.getMillis();
}
- for (Map.Entry<State, Duration> duration :
getWorkStreamLatencies.entrySet()) {
+ for (Map.Entry<State, Duration> duration :
aggregatedGetWorkStreamLatencies.entrySet()) {
latencyAttributions.add(
LatencyAttribution.newBuilder()
.setState(duration.getKey())
.setTotalDurationMillis(
- (duration.getValue().getMillis() /
totalSumDurationTimeMills)
- * totalDurationWallTimeMills)
+ (long)
+ (((double) duration.getValue().getMillis()
+ / (double) totalSumDurationTimeMills)
Review Comment:
done, double cast to long is not implicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]