scwhittle commented on code in PR #23333:
URL: https://github.com/apache/beam/pull/23333#discussion_r996736814


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3146,283 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  static class FakeClock implements Supplier<Instant> {
+    private static final FakeClock DEFAULT = new FakeClock();

Review Comment:
   It seems this coudl just be instance in the test instead of a static



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -472,7 +485,13 @@ public void startBatchAndBlock() {
       }
 
       Windmill.KeyedGetDataRequest request = createRequest(toFetch);
-      Windmill.KeyedGetDataResponse response = 
server.getStateData(computation, request);
+      Windmill.KeyedGetDataResponse response;
+      try (AutoCloseable readWrapper =
+          readWrapperSupplier == null ? null : readWrapperSupplier.get()) {
+        response = server.getStateData(computation, request);
+      } catch (Exception e) {

Review Comment:
   I don't think you should catch this, outer block takes care of it



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -195,6 +196,7 @@ public ValuesAndContPosition(List<T> values, @Nullable 
ContinuationT continuatio
   private final ByteString key;
   private final long shardingKey;
   private final long workToken;
+  private final Supplier<AutoCloseable> readWrapperSupplier;

Review Comment:
   Add comment, something like
   // Blocking reads should be wrapped with a closable vended by 
readWrapperSupplier



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -472,7 +485,13 @@ public void startBatchAndBlock() {
       }
 
       Windmill.KeyedGetDataRequest request = createRequest(toFetch);
-      Windmill.KeyedGetDataResponse response = 
server.getStateData(computation, request);
+      Windmill.KeyedGetDataResponse response;
+      try (AutoCloseable readWrapper =
+          readWrapperSupplier == null ? null : readWrapperSupplier.get()) {

Review Comment:
   create a no-op supplier in constructor so that when running it is always 
non-null



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -280,7 +329,20 @@ public Windmill.GlobalData 
requestGlobalData(Windmill.GlobalDataRequest request)
       }
 
       @Override
-      public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> 
active) {}
+      public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> 
active) {
+        Windmill.GetDataRequest.Builder builder = 
Windmill.GetDataRequest.newBuilder();
+        for (Map.Entry<String, List<KeyedGetDataRequest>> entry : 
active.entrySet()) {
+          builder.addRequests(
+              ComputationGetDataRequest.newBuilder()
+                  .setComputationId(entry.getKey())
+                  .addAllRequests(entry.getValue())
+                  .build());
+          Windmill.ComputationGetDataRequest.Builder computationBuilder =

Review Comment:
   remove next two lines



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -280,7 +329,20 @@ public Windmill.GlobalData 
requestGlobalData(Windmill.GlobalDataRequest request)
       }
 
       @Override
-      public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> 
active) {}
+      public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> 
active) {
+        Windmill.GetDataRequest.Builder builder = 
Windmill.GetDataRequest.newBuilder();
+        for (Map.Entry<String, List<KeyedGetDataRequest>> entry : 
active.entrySet()) {
+          builder.addRequests(
+              ComputationGetDataRequest.newBuilder()
+                  .setComputationId(entry.getKey())
+                  .addAllRequests(entry.getValue())
+                  .build());

Review Comment:
   nit I think addRequests would take a builder so you don't have to build here



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -307,10 +369,16 @@ public boolean commitWorkItem(
           WorkItemCommitRequest request,
           Consumer<Windmill.CommitStatus> onDone) {
         LOG.debug("commitWorkStream::commitWorkItem: {}", request);
+        // Sleep before calling the onDone callback.
+        Uninterruptibles.sleepUninterruptibly(commitWorkSleep.getMillis(), 
TimeUnit.MILLISECONDS);

Review Comment:
   do you need this with injected sleep for commit queue?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3146,283 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  static class FakeClock implements Supplier<Instant> {
+    private static final FakeClock DEFAULT = new FakeClock();
+
+    private Instant now = Instant.now();
+
+    @Override
+    public synchronized Instant get() {
+      return now;
+    }
+
+    synchronized void sleep(Duration duration) {
+      this.now = this.now.plus(duration);
+    }
+  }
+
+  @Test
+  public void testLatencyAttributionProtobufsPopulated() throws Exception {
+    StreamingDataflowWorker.Work work =
+        new StreamingDataflowWorker.Work(null, FakeClock.DEFAULT) {
+          @Override
+          public void run() {}
+        };
+
+    FakeClock.DEFAULT.sleep(Duration.millis(10));
+    work.setState(StreamingDataflowWorker.Work.State.PROCESSING);
+    FakeClock.DEFAULT.sleep(Duration.millis(20));
+    work.setState(StreamingDataflowWorker.Work.State.COMMIT_QUEUED);
+    FakeClock.DEFAULT.sleep(Duration.millis(30));

Review Comment:
   might as well test READING and COMMITTING (to test merge)
   
   also go back to a previous state to verify that sums correctly



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -941,14 +958,18 @@ public void addWorkerStatusPage(BaseStatusServlet page) {
   public void stop() {
     try {
       if (globalConfigRefreshTimer != null) {
-        globalConfigRefreshTimer.cancel();
+        globalConfigRefreshTimer.shutdown();
+        globalConfigRefreshTimer.awaitTermination(300, TimeUnit.SECONDS);

Review Comment:
   nit: the previous semantics of cancel() are unclear if the task is running, 
it may not block if it was running just prevent it from running again.  In that 
case you may be increasing shutdown time here by stopping things serially.  You 
could instead cancel everything and then await on them all.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3146,283 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  static class FakeClock implements Supplier<Instant> {
+    private static final FakeClock DEFAULT = new FakeClock();
+
+    private Instant now = Instant.now();
+
+    @Override
+    public synchronized Instant get() {
+      return now;
+    }
+
+    synchronized void sleep(Duration duration) {
+      this.now = this.now.plus(duration);
+    }
+  }
+
+  @Test
+  public void testLatencyAttributionProtobufsPopulated() throws Exception {
+    StreamingDataflowWorker.Work work =
+        new StreamingDataflowWorker.Work(null, FakeClock.DEFAULT) {
+          @Override
+          public void run() {}
+        };
+
+    FakeClock.DEFAULT.sleep(Duration.millis(10));
+    work.setState(StreamingDataflowWorker.Work.State.PROCESSING);
+    FakeClock.DEFAULT.sleep(Duration.millis(20));
+    work.setState(StreamingDataflowWorker.Work.State.COMMIT_QUEUED);
+    FakeClock.DEFAULT.sleep(Duration.millis(30));
+
+    Iterator<LatencyAttribution> it = 
work.getLatencyAttributionList().iterator();
+    assertTrue(it.hasNext());
+    LatencyAttribution lat = it.next();
+    assertTrue(lat.getState() == LatencyAttribution.State.QUEUED);
+    assertTrue(lat.getTotalDurationMillis() == 10);
+    assertTrue(it.hasNext());
+    lat = it.next();
+    assertTrue(lat.getState() == LatencyAttribution.State.ACTIVE);
+    assertTrue(lat.getTotalDurationMillis() == 20);
+    assertTrue(it.hasNext());
+    lat = it.next();
+    assertTrue(lat.getState() == LatencyAttribution.State.COMMITTING);
+    assertTrue(lat.getTotalDurationMillis() == 30);
+    assertTrue(!it.hasNext());
+  }
+
+  // Aggregates LatencyAttribution data from active work refresh requests.
+  static class ActiveWorkRefreshSink {
+    private final Function<GetDataRequest, GetDataResponse> responder;
+    private final Map<Long, EnumMap<LatencyAttribution.State, Duration>> 
totalDurations =
+        new HashMap<>();
+
+    ActiveWorkRefreshSink(Function<GetDataRequest, GetDataResponse> responder) 
{
+      this.responder = responder;
+    }
+
+    Duration getLatencyAttributionDuration(long workToken, 
LatencyAttribution.State state) {
+      EnumMap<LatencyAttribution.State, Duration> durations = 
totalDurations.get(workToken);
+      return durations == null ? Duration.ZERO : durations.getOrDefault(state, 
Duration.ZERO);
+    }
+
+    boolean isActiveWorkRefresh(GetDataRequest request) {
+      for (ComputationGetDataRequest computationRequest : 
request.getRequestsList()) {
+        if 
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+          return false;
+        }
+        for (KeyedGetDataRequest keyedRequest : 
computationRequest.getRequestsList()) {
+          if (keyedRequest.getWorkToken() == 0
+              || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+              || keyedRequest.getValuesToFetchCount() != 0
+              || keyedRequest.getBagsToFetchCount() != 0
+              || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+              || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    GetDataResponse getData(GetDataRequest request) {
+      if (!isActiveWorkRefresh(request)) {
+        return responder.apply(request);
+      }
+      for (ComputationGetDataRequest computationRequest : 
request.getRequestsList()) {
+        for (KeyedGetDataRequest keyedRequest : 
computationRequest.getRequestsList()) {
+          for (LatencyAttribution la : 
keyedRequest.getLatencyAttributionList()) {
+            EnumMap<LatencyAttribution.State, Duration> durations =
+                totalDurations.computeIfAbsent(
+                    keyedRequest.getWorkToken(),
+                    (Long workToken) ->
+                        new EnumMap<LatencyAttribution.State, Duration>(
+                            LatencyAttribution.State.class));
+            Duration cur = Duration.millis(la.getTotalDurationMillis());
+            durations.compute(la.getState(), (s, d) -> d == null || 
d.isShorterThan(cur) ? cur : d);
+          }
+        }
+      }
+      return EMPTY_DATA_RESPONDER.apply(request);
+    }
+  }
+
+  @Test
+  public void testLatencyAttributionToQueuedState() throws Exception {
+    final int workToken = 3232; // A unique id makes it easier to search logs.
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(new SlowDoFn(Duration.millis(1000)), 0, 
StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+    StreamingDataflowWorkerOptions options = 
createTestingPipelineOptions(server);
+    options.setActiveWorkRefreshPeriodMillis(100);

Review Comment:
   Add a comment about timing
   // Currently the heartbeats are sent based upon real time so we set a low 
refresh period and sleep on the real clock while
   // advancing the fake clock.  The fake clock is used for latency attribution 
durations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to