scwhittle commented on code in PR #34283:
URL: https://github.com/apache/beam/pull/34283#discussion_r2025004231


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -148,19 +171,101 @@ protected boolean hasPendingRequests() {
     @Override
     protected void startThrottleTimer() {}
 
-    public void testSend(Integer i)
-        throws ResettableThrowingStreamObserver.StreamClosedException,
-            WindmillStreamShutdownException {
+    public void testSend(Integer i) throws WindmillStreamShutdownException {
       trySend(i);
     }
 
     @Override
-    protected void sendHealthCheck() {}
+    protected void sendHealthCheck() {
+      numHealthChecks.incrementAndGet();
+    }
+
+    private void waitForHealthChecks(int expectedHealthChecks) {
+      int waitedMillis = 0;
+      while (numHealthChecks.get() < expectedHealthChecks) {
+        LOG.info(
+            "Waited for {}ms for {} health checks. Current health check count 
is {}.",
+            waitedMillis,
+            numHealthChecks.get(),
+            expectedHealthChecks);
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      }
+    }
 
     @Override
     protected void appendSpecificHtml(PrintWriter writer) {}
 
     @Override
     protected void shutdownInternal() {}
   }
+
+  private static class TestCallStreamObserver extends 
CallStreamObserver<Integer> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
+    private final CountDownLatch sendBlocker = new CountDownLatch(1);
+    private final AtomicInteger numSends = new AtomicInteger();
+
+    private final boolean waitForSend;
+
+    private TestCallStreamObserver(boolean waitForSend) {

Review Comment:
   can this param and the member be called blockSend?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -110,12 +80,65 @@ public void setMessageCompression(boolean b) {}
     // Sleep a bit to give sendExecutor time to execute the send().

Review Comment:
   seems like this should be before the shutdown() if we're tryign to give send 
a chance to be blocked before shutting down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to