scwhittle commented on code in PR #23333:
URL: https://github.com/apache/beam/pull/23333#discussion_r988819377
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
assertThat(server.numGetDataRequests(), greaterThan(0));
}
+ // A class that aggregates LatencyAttribution data from active work refresh
requests.
+ private static class ActiveWorkRefreshSink {
+ Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations =
new HashMap<>();
+
+ // Accessor for reading out aggregated LatencyAttribution data.
+ Duration getLatencyAttributionDuration(long workToken,
LatencyAttribution.State state) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
totalDurations.get(workToken);
+ if (durations == null) {
+ return Duration.ZERO;
+ }
+ Duration d = durations.get(state);
Review Comment:
getWithDefault
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3046,19 +3048,25 @@ public void testHugeCommits() throws Exception {
private static class SlowDoFn extends DoFn<String, String> {
+ private final Duration sleep;
+
+ SlowDoFn(Duration sleep) {
+ this.sleep = sleep;
+ }
+
+ SlowDoFn() {
+ this(Duration.millis(1000));
+ }
+
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- Thread.sleep(1000);
+ Uninterruptibles.sleepUninterruptibly(sleep.getMillis(),
TimeUnit.MILLISECONDS);
Review Comment:
would be better to inject a clock to this and the same one into
StreamingDataflowWorker instead of relying on sleep timing
Can you take a look at if that would be not too bad to add? Somewhat
existing but would be nice to cleanup while modifying
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1406,7 +1422,13 @@ private void process(
computationId,
key,
workItem.getShardingKey(),
- workItem.getWorkToken());
+ workItem.getWorkToken(),
+ () -> {
Review Comment:
how about a single fun that returns a Closable?
That will help ensure they are matched up
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -2377,12 +2399,48 @@ public List<Windmill.KeyedGetDataRequest>
getKeysToRefresh(Instant refreshDeadli
ShardedKey shardedKey = entry.getKey();
for (Work work : entry.getValue()) {
if (work.getStartTime().isBefore(refreshDeadline)) {
- result.add(
+ Map<Windmill.LatencyAttribution.State, Duration> durations =
Review Comment:
it would be simpler to just build this map up in the work item instead of
one keyed by State
That saves merging here since it's already handled there. just need
translation between enums
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
assertThat(server.numGetDataRequests(), greaterThan(0));
}
+ // A class that aggregates LatencyAttribution data from active work refresh
requests.
+ private static class ActiveWorkRefreshSink {
+ Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations =
new HashMap<>();
+
+ // Accessor for reading out aggregated LatencyAttribution data.
+ Duration getLatencyAttributionDuration(long workToken,
LatencyAttribution.State state) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
totalDurations.get(workToken);
+ if (durations == null) {
+ return Duration.ZERO;
+ }
+ Duration d = durations.get(state);
+ if (d == null) {
+ return Duration.ZERO;
+ }
+ return d;
+ }
+
+ // Handles active work refresh requests when passed to
FakeWindmillServer.addDataFnToOffer.
+ GetDataResponse getData(GetDataRequest request) {
+ boolean isActiveWorkRefresh = true;
+ for (ComputationGetDataRequest computationRequest :
request.getRequestsList()) {
+ if
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (KeyedGetDataRequest keyedRequest :
computationRequest.getRequestsList()) {
+ if (keyedRequest.getWorkToken() == 0
+ || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+ || keyedRequest.getValuesToFetchCount() != 0
+ || keyedRequest.getBagsToFetchCount() != 0
+ || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+ || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (LatencyAttribution la :
keyedRequest.getLatencyAttributionList()) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
+ totalDurations.computeIfAbsent(
+ keyedRequest.getWorkToken(),
+ (Long workToken) ->
+ new EnumMap<LatencyAttribution.State, Duration>(
+ LatencyAttribution.State.class));
+ Duration old = durations.get(la.getState());
+ Duration cur = Duration.millis(la.getTotalDurationMillis());
+ if (old == null || old.isShorterThan(cur)) {
+ durations.put(la.getState(), cur);
+ }
+ }
+ }
+ }
+ if (!isActiveWorkRefresh) {
+ // The unit test below for state QUEUED relies on this delay.
+ Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+ }
+ return EMPTY_DATA_RESPONDER.apply(request);
+ }
+ }
+
+ @Test
+ public void testLatencyAttributionToQueuedState() throws Exception {
+ final int workToken = 323232; // A unique id makes it easier to find logs.
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(StringUtf8Coder.of()),
+ makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0,
StringUtf8Coder.of()),
+ makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+ FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+ server.setGetDataSleep(Duration.ZERO);
+ StreamingDataflowWorkerOptions options =
createTestingPipelineOptions(server);
+ options.setActiveWorkRefreshPeriodMillis(100);
+ // A single-threaded worker processes work sequentially, leaving a second
work item in state
+ // QUEUED until the first work item is committed.
+ options.setNumberOfWorkerHarnessThreads(1);
+ StreamingDataflowWorker worker = makeWorker(instructions, options, false
/* publishCounters */);
+ worker.start();
+
+ ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+ for (int i = 0; i < 1000; ++i) {
+ server.addDataFnToOffer(
+ (GetDataRequest request) -> {
+ return awrSink.getData(request);
+ });
+ }
+ server.addWorkToOffer(makeInput(workToken + 1,
TimeUnit.MILLISECONDS.toMicros(100)));
+ server.addWorkToOffer(makeInput(workToken,
TimeUnit.MILLISECONDS.toMicros(100)));
+ server.waitForAndGetCommits(2);
+
+ worker.stop();
+
+ assertTrue(
Review Comment:
verify that other ones are less that queued state?
otherwise if getLatencyAttributionDruation always returned 1 sec all of the
tests would pass.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
assertThat(server.numGetDataRequests(), greaterThan(0));
}
+ // A class that aggregates LatencyAttribution data from active work refresh
requests.
+ private static class ActiveWorkRefreshSink {
+ Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations =
new HashMap<>();
+
+ // Accessor for reading out aggregated LatencyAttribution data.
+ Duration getLatencyAttributionDuration(long workToken,
LatencyAttribution.State state) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
totalDurations.get(workToken);
+ if (durations == null) {
+ return Duration.ZERO;
+ }
+ Duration d = durations.get(state);
+ if (d == null) {
+ return Duration.ZERO;
+ }
+ return d;
+ }
+
+ // Handles active work refresh requests when passed to
FakeWindmillServer.addDataFnToOffer.
+ GetDataResponse getData(GetDataRequest request) {
+ boolean isActiveWorkRefresh = true;
+ for (ComputationGetDataRequest computationRequest :
request.getRequestsList()) {
+ if
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (KeyedGetDataRequest keyedRequest :
computationRequest.getRequestsList()) {
+ if (keyedRequest.getWorkToken() == 0
+ || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+ || keyedRequest.getValuesToFetchCount() != 0
+ || keyedRequest.getBagsToFetchCount() != 0
+ || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+ || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+ isActiveWorkRefresh = false;
+ continue;
+ }
+ for (LatencyAttribution la :
keyedRequest.getLatencyAttributionList()) {
+ EnumMap<LatencyAttribution.State, Duration> durations =
+ totalDurations.computeIfAbsent(
+ keyedRequest.getWorkToken(),
+ (Long workToken) ->
+ new EnumMap<LatencyAttribution.State, Duration>(
+ LatencyAttribution.State.class));
+ Duration old = durations.get(la.getState());
+ Duration cur = Duration.millis(la.getTotalDurationMillis());
+ if (old == null || old.isShorterThan(cur)) {
+ durations.put(la.getState(), cur);
+ }
+ }
+ }
+ }
+ if (!isActiveWorkRefresh) {
+ // The unit test below for state QUEUED relies on this delay.
+ Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+ }
+ return EMPTY_DATA_RESPONDER.apply(request);
+ }
+ }
+
+ @Test
+ public void testLatencyAttributionToQueuedState() throws Exception {
+ final int workToken = 323232; // A unique id makes it easier to find logs.
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(StringUtf8Coder.of()),
+ makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0,
StringUtf8Coder.of()),
+ makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+ FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+ server.setGetDataSleep(Duration.ZERO);
+ StreamingDataflowWorkerOptions options =
createTestingPipelineOptions(server);
+ options.setActiveWorkRefreshPeriodMillis(100);
+ // A single-threaded worker processes work sequentially, leaving a second
work item in state
+ // QUEUED until the first work item is committed.
+ options.setNumberOfWorkerHarnessThreads(1);
+ StreamingDataflowWorker worker = makeWorker(instructions, options, false
/* publishCounters */);
+ worker.start();
+
+ ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+ for (int i = 0; i < 1000; ++i) {
+ server.addDataFnToOffer(
Review Comment:
can you modify the server to allow specifying a default? instead of just
loading 1000 up
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -454,6 +469,10 @@ public void startBatchAndBlock() {
// First, drain work out of the pending lookups into a set. These will be
the items we fetch.
HashSet<StateTag<?>> toFetch = Sets.newHashSet();
try {
+ if (beforeRead != null) {
Review Comment:
I was concerned that this might be incorrect if we ever call this from
threads that are not the main processing thread (ie background pagination of
bags). However though we attempt to fetch the next bag page ahead of time it
isn't issued in the background, just whenever it is needed or another state
fetch would block.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -454,6 +469,10 @@ public void startBatchAndBlock() {
// First, drain work out of the pending lookups into a set. These will be
the items we fetch.
HashSet<StateTag<?>> toFetch = Sets.newHashSet();
try {
+ if (beforeRead != null) {
Review Comment:
Maybe we should move this down to only where we call getStateData
That will prevent a transition in cases where we don't actually issue a
read to the server.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]