This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new abbaab8bfcd Fix flaky 
GrpcDirectGetWorkStreamTest.testConsumedWorkItems_itemsSplitAcrossResponses 
(#36129)
abbaab8bfcd is described below

commit abbaab8bfcdc887868c5eea5715e2e6bd098082d
Author: Sam Whittle <[email protected]>
AuthorDate: Fri Sep 12 13:58:31 2025 +0200

    Fix flaky 
GrpcDirectGetWorkStreamTest.testConsumedWorkItems_itemsSplitAcrossResponses 
(#36129)
---
 .../client/grpc/GrpcDirectGetWorkStreamTest.java   | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
index 1014242317d..41900017838 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
@@ -392,7 +392,9 @@ public class GrpcDirectGetWorkStreamTest {
 
   @Test
   public void testConsumedWorkItems_itemsSplitAcrossResponses() throws 
InterruptedException {
-    int expectedRequests = 3;
+    // We send all the responses on the first request. We don't care if there 
are additional
+    // requests.
+    int expectedRequests = 1;
     CountDownLatch waitForRequests = new CountDownLatch(expectedRequests);
     TestGetWorkRequestObserver requestObserver = new 
TestGetWorkRequestObserver(waitForRequests);
     GetWorkStreamTestStub testStub = new 
GetWorkStreamTestStub(requestObserver);
@@ -426,9 +428,9 @@ public class GrpcDirectGetWorkStreamTest {
     Windmill.WorkItem workItem3 =
         Windmill.WorkItem.newBuilder()
             .setKey(ByteString.copyFromUtf8("somewhat_long_key3"))
-            .setWorkToken(2L)
-            .setShardingKey(2L)
-            .setCacheToken(2L)
+            .setWorkToken(3L)
+            .setShardingKey(3L)
+            .setCacheToken(3L)
             .build();
 
     List<ByteString> chunks1 = new ArrayList<>();
@@ -444,12 +446,12 @@ public class GrpcDirectGetWorkStreamTest {
 
     chunks3.add(workItem3.toByteString());
 
+    assertTrue(waitForRequests.await(5, TimeUnit.SECONDS));
+
     testStub.injectResponse(createResponse(chunks1, bytes.size() - third));
     testStub.injectResponse(createResponse(chunks2, bytes.size() - 2 * third));
     testStub.injectResponse(createResponse(chunks3, 0));
 
-    assertTrue(waitForRequests.await(5, TimeUnit.SECONDS));
-
     assertThat(scheduledWorkItems).containsExactly(workItem1, workItem2, 
workItem3);
   }
 
@@ -458,6 +460,7 @@ public class GrpcDirectGetWorkStreamTest {
 
     private final TestGetWorkRequestObserver requestObserver;
     private @Nullable StreamObserver<Windmill.StreamingGetWorkResponseChunk> 
responseObserver;
+    private final CountDownLatch waitForStream = new CountDownLatch(1);
 
     private GetWorkStreamTestStub(TestGetWorkRequestObserver requestObserver) {
       this.requestObserver = requestObserver;
@@ -466,15 +469,17 @@ public class GrpcDirectGetWorkStreamTest {
     @Override
     public StreamObserver<Windmill.StreamingGetWorkRequest> getWorkStream(
         StreamObserver<Windmill.StreamingGetWorkResponseChunk> 
responseObserver) {
-      if (this.responseObserver == null) {
-        this.responseObserver = responseObserver;
-        requestObserver.responseObserver = this.responseObserver;
-      }
+      assertThat(this.responseObserver).isNull();
+      this.responseObserver = responseObserver;
+      requestObserver.responseObserver = this.responseObserver;
+      waitForStream.countDown();
 
       return requestObserver;
     }
 
-    private void injectResponse(Windmill.StreamingGetWorkResponseChunk 
responseChunk) {
+    private void injectResponse(Windmill.StreamingGetWorkResponseChunk 
responseChunk)
+        throws InterruptedException {
+      waitForStream.await();
       checkNotNull(responseObserver).onNext(responseChunk);
     }
   }

Reply via email to