y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1223692743
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
@Test
public void testGetWorkTimingInfosTracker() throws Exception {
- GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+ GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() ->
50);
List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
for (int i = 0; i <= 3; i++) {
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_CREATION_START)
- .setTimestampUsec(i)
+ .setTimestampUsec(0)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_CREATION_END)
- .setTimestampUsec(i + 1)
+ .setTimestampUsec(10000)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
- .setTimestampUsec(i + 2)
+ .setTimestampUsec((i + 11) * 1000)
.build());
infos.add(
GetWorkStreamTimingInfo.newBuilder()
.setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
- .setTimestampUsec(i + 3)
+ .setTimestampUsec((i + 16) * 1000)
.build());
+ tracker.addTimingInfo(infos);
+ infos.clear();
}
- tracker.addTimingInfo(infos);
- Set<State> states = new HashSet<>();
+ // durations for each chunk:
+ // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+ // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+ // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+ Map<State, LatencyAttribution> latencies = new HashMap<>();
List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
assertEquals(3, attributions.size());
for (LatencyAttribution attribution : attributions) {
- states.add(attribution.getState());
+ latencies.put(attribution.getState(), attribution);
}
- assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
- assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
- assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+ assertEquals(10L,
latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+ // elapsed time from 10 -> 50;
+ long elapsedTime = 40;
+ // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+ long sumDurations = 140;
+ assertEquals(
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]