arne-alex commented on code in PR #23333:
URL: https://github.com/apache/beam/pull/23333#discussion_r995468602
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
assertThat(server.numGetDataRequests(), greaterThan(0));
}
+ // A class that aggregates LatencyAttribution data from active work refresh
requests.
+ private static class ActiveWorkRefreshSink {
+ Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations =
new HashMap<>();
+
+ // Accessor for reading out aggregated LatencyAttribution data.
+ Duration getLatencyAttributionDuration(long workToken,
LatencyAttribution.State state) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
totalDurations.get(workToken);
+ if (durations == null) {
+ return Duration.ZERO;
+ }
+ Duration d = durations.get(state);
+ if (d == null) {
+ return Duration.ZERO;
+ }
+ return d;
+ }
+
+ // Handles active work refresh requests when passed to
FakeWindmillServer.addDataFnToOffer.
+ GetDataResponse getData(GetDataRequest request) {
+ boolean isActiveWorkRefresh = true;
+ for (ComputationGetDataRequest computationRequest :
request.getRequestsList()) {
+ if
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (KeyedGetDataRequest keyedRequest :
computationRequest.getRequestsList()) {
+ if (keyedRequest.getWorkToken() == 0
+ || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+ || keyedRequest.getValuesToFetchCount() != 0
+ || keyedRequest.getBagsToFetchCount() != 0
+ || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+ || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (LatencyAttribution la :
keyedRequest.getLatencyAttributionList()) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
+ totalDurations.computeIfAbsent(
+ keyedRequest.getWorkToken(),
+ (Long workToken) ->
+ new EnumMap<LatencyAttribution.State, Duration>(
+ LatencyAttribution.State.class));
+ Duration old = durations.get(la.getState());
+ Duration cur = Duration.millis(la.getTotalDurationMillis());
+ if (old == null || old.isShorterThan(cur)) {
+ durations.put(la.getState(), cur);
+ }
+ }
+ }
+ }
+ if (!isActiveWorkRefresh) {
+ // The unit test below for state QUEUED relies on this delay.
+ Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+ }
+ return EMPTY_DATA_RESPONDER.apply(request);
+ }
+ }
+
+ @Test
+ public void testLatencyAttributionToQueuedState() throws Exception {
+ final int workToken = 323232; // A unique id makes it easier to find logs.
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(StringUtf8Coder.of()),
+ makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0,
StringUtf8Coder.of()),
+ makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+ FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+ server.setGetDataSleep(Duration.ZERO);
+ StreamingDataflowWorkerOptions options =
createTestingPipelineOptions(server);
+ options.setActiveWorkRefreshPeriodMillis(100);
+ // A single-threaded worker processes work sequentially, leaving a second
work item in state
+ // QUEUED until the first work item is committed.
+ options.setNumberOfWorkerHarnessThreads(1);
+ StreamingDataflowWorker worker = makeWorker(instructions, options, false
/* publishCounters */);
+ worker.start();
+
+ ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+ for (int i = 0; i < 1000; ++i) {
+ server.addDataFnToOffer(
Review Comment:
Done (after next force push). I've put the default behind a new nested class
'ResponseQueue'. This allows us to reuse this class for GetWork, GetData, and
CommitWork requests in unit test. I hope that's not overengineered.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
assertThat(server.numGetDataRequests(), greaterThan(0));
}
+ // A class that aggregates LatencyAttribution data from active work refresh
requests.
+ private static class ActiveWorkRefreshSink {
+ Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations =
new HashMap<>();
+
+ // Accessor for reading out aggregated LatencyAttribution data.
+ Duration getLatencyAttributionDuration(long workToken,
LatencyAttribution.State state) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
totalDurations.get(workToken);
+ if (durations == null) {
+ return Duration.ZERO;
+ }
+ Duration d = durations.get(state);
+ if (d == null) {
+ return Duration.ZERO;
+ }
+ return d;
+ }
+
+ // Handles active work refresh requests when passed to
FakeWindmillServer.addDataFnToOffer.
+ GetDataResponse getData(GetDataRequest request) {
+ boolean isActiveWorkRefresh = true;
+ for (ComputationGetDataRequest computationRequest :
request.getRequestsList()) {
+ if
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (KeyedGetDataRequest keyedRequest :
computationRequest.getRequestsList()) {
+ if (keyedRequest.getWorkToken() == 0
+ || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+ || keyedRequest.getValuesToFetchCount() != 0
+ || keyedRequest.getBagsToFetchCount() != 0
+ || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+ || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (LatencyAttribution la :
keyedRequest.getLatencyAttributionList()) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
+ totalDurations.computeIfAbsent(
+ keyedRequest.getWorkToken(),
+ (Long workToken) ->
+ new EnumMap<LatencyAttribution.State, Duration>(
+ LatencyAttribution.State.class));
+ Duration old = durations.get(la.getState());
+ Duration cur = Duration.millis(la.getTotalDurationMillis());
+ if (old == null || old.isShorterThan(cur)) {
+ durations.put(la.getState(), cur);
+ }
+ }
+ }
+ }
+ if (!isActiveWorkRefresh) {
+ // The unit test below for state QUEUED relies on this delay.
+ Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+ }
+ return EMPTY_DATA_RESPONDER.apply(request);
+ }
+ }
+
+ @Test
+ public void testLatencyAttributionToQueuedState() throws Exception {
+ final int workToken = 323232; // A unique id makes it easier to find logs.
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(StringUtf8Coder.of()),
+ makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0,
StringUtf8Coder.of()),
+ makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+ FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+ server.setGetDataSleep(Duration.ZERO);
+ StreamingDataflowWorkerOptions options =
createTestingPipelineOptions(server);
+ options.setActiveWorkRefreshPeriodMillis(100);
+ // A single-threaded worker processes work sequentially, leaving a second
work item in state
+ // QUEUED until the first work item is committed.
+ options.setNumberOfWorkerHarnessThreads(1);
+ StreamingDataflowWorker worker = makeWorker(instructions, options, false
/* publishCounters */);
+ worker.start();
+
+ ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+ for (int i = 0; i < 1000; ++i) {
+ server.addDataFnToOffer(
+ (GetDataRequest request) -> {
+ return awrSink.getData(request);
+ });
+ }
+ server.addWorkToOffer(makeInput(workToken + 1,
TimeUnit.MILLISECONDS.toMicros(100)));
+ server.addWorkToOffer(makeInput(workToken,
TimeUnit.MILLISECONDS.toMicros(100)));
+ server.waitForAndGetCommits(2);
+
+ worker.stop();
+
+ assertTrue(
Review Comment:
Done (after next force push)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]