This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8820f28 [BEAM-5709] BeamFnControlServiceTest - use latch instead of
sleep
new f2236e5 Merge pull request #8671 from youngoli/beam5709
8820f28 is described below
commit 8820f287c24ed54ac0c8c61efc52fab74ed8f6fa
Author: Daniel Oliveira <[email protected]>
AuthorDate: Thu May 23 14:03:36 2019 -0700
[BEAM-5709] BeamFnControlServiceTest - use latch instead of sleep
---
.../worker/fn/BeamFnControlServiceTest.java | 33 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 2 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
index 86e2c3e..11c88d5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
@@ -18,11 +18,13 @@
package org.apache.beam.runners.dataflow.worker.fn;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.net.InetAddress;
import java.net.ServerSocket;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
@@ -67,6 +69,15 @@ public class BeamFnControlServiceTest {
@Test
public void testClientConnecting() throws Exception {
+ CountDownLatch requestCompleted = new CountDownLatch(1);
+ doAnswer(
+ invocation -> {
+ requestCompleted.countDown();
+ return null;
+ })
+ .when(requestObserver)
+ .onCompleted();
+
PipelineOptions options = PipelineOptionsFactory.create();
Endpoints.ApiServiceDescriptor descriptor = findOpenPort();
BeamFnControlService service =
@@ -87,7 +98,8 @@ public class BeamFnControlServiceTest {
server.shutdown();
server.awaitTermination(1, TimeUnit.SECONDS);
server.shutdownNow();
- Thread.sleep(1000); // Wait for stub to close stream.
+
+ requestCompleted.await(5, TimeUnit.SECONDS); // Wait until request streams
have been closed.
verify(requestObserver).onCompleted();
verifyNoMoreInteractions(requestObserver);
@@ -95,6 +107,22 @@ public class BeamFnControlServiceTest {
@Test
public void testMultipleClientsConnecting() throws Exception {
+ CountDownLatch requestCompleted = new CountDownLatch(2);
+ doAnswer(
+ invocation -> {
+ requestCompleted.countDown();
+ return null;
+ })
+ .when(requestObserver)
+ .onCompleted();
+ doAnswer(
+ invocation -> {
+ requestCompleted.countDown();
+ return null;
+ })
+ .when(anotherRequestObserver)
+ .onCompleted();
+
PipelineOptions options = PipelineOptionsFactory.create();
Endpoints.ApiServiceDescriptor descriptor = findOpenPort();
BeamFnControlService service =
@@ -126,7 +154,8 @@ public class BeamFnControlServiceTest {
server.shutdown();
server.awaitTermination(1, TimeUnit.SECONDS);
server.shutdownNow();
- Thread.sleep(1000); // Wait for stub to close stream.
+
+ requestCompleted.await(5, TimeUnit.SECONDS); // Wait until request streams
have been closed.
verify(requestObserver).onCompleted();
verifyNoMoreInteractions(requestObserver);