This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8820f28  [BEAM-5709] BeamFnControlServiceTest - use latch instead of 
sleep
     new f2236e5  Merge pull request #8671 from youngoli/beam5709
8820f28 is described below

commit 8820f287c24ed54ac0c8c61efc52fab74ed8f6fa
Author: Daniel Oliveira <[email protected]>
AuthorDate: Thu May 23 14:03:36 2019 -0700

    [BEAM-5709] BeamFnControlServiceTest - use latch instead of sleep
---
 .../worker/fn/BeamFnControlServiceTest.java        | 33 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
index 86e2c3e..11c88d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
@@ -18,11 +18,13 @@
 package org.apache.beam.runners.dataflow.worker.fn;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
@@ -67,6 +69,15 @@ public class BeamFnControlServiceTest {
 
   @Test
   public void testClientConnecting() throws Exception {
+    CountDownLatch requestCompleted = new CountDownLatch(1);
+    doAnswer(
+            invocation -> {
+              requestCompleted.countDown();
+              return null;
+            })
+        .when(requestObserver)
+        .onCompleted();
+
     PipelineOptions options = PipelineOptionsFactory.create();
     Endpoints.ApiServiceDescriptor descriptor = findOpenPort();
     BeamFnControlService service =
@@ -87,7 +98,8 @@ public class BeamFnControlServiceTest {
     server.shutdown();
     server.awaitTermination(1, TimeUnit.SECONDS);
     server.shutdownNow();
-    Thread.sleep(1000); // Wait for stub to close stream.
+
+    requestCompleted.await(5, TimeUnit.SECONDS); // Wait until request streams 
have been closed.
 
     verify(requestObserver).onCompleted();
     verifyNoMoreInteractions(requestObserver);
@@ -95,6 +107,22 @@ public class BeamFnControlServiceTest {
 
   @Test
   public void testMultipleClientsConnecting() throws Exception {
+    CountDownLatch requestCompleted = new CountDownLatch(2);
+    doAnswer(
+            invocation -> {
+              requestCompleted.countDown();
+              return null;
+            })
+        .when(requestObserver)
+        .onCompleted();
+    doAnswer(
+            invocation -> {
+              requestCompleted.countDown();
+              return null;
+            })
+        .when(anotherRequestObserver)
+        .onCompleted();
+
     PipelineOptions options = PipelineOptionsFactory.create();
     Endpoints.ApiServiceDescriptor descriptor = findOpenPort();
     BeamFnControlService service =
@@ -126,7 +154,8 @@ public class BeamFnControlServiceTest {
     server.shutdown();
     server.awaitTermination(1, TimeUnit.SECONDS);
     server.shutdownNow();
-    Thread.sleep(1000); // Wait for stub to close stream.
+
+    requestCompleted.await(5, TimeUnit.SECONDS); // Wait until request streams 
have been closed.
 
     verify(requestObserver).onCompleted();
     verifyNoMoreInteractions(requestObserver);

Reply via email to