This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new abbaab8bfcd Fix flaky
GrpcDirectGetWorkStreamTest.testConsumedWorkItems_itemsSplitAcrossResponses
(#36129)
abbaab8bfcd is described below
commit abbaab8bfcdc887868c5eea5715e2e6bd098082d
Author: Sam Whittle <[email protected]>
AuthorDate: Fri Sep 12 13:58:31 2025 +0200
Fix flaky
GrpcDirectGetWorkStreamTest.testConsumedWorkItems_itemsSplitAcrossResponses
(#36129)
---
.../client/grpc/GrpcDirectGetWorkStreamTest.java | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
index 1014242317d..41900017838 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
@@ -392,7 +392,9 @@ public class GrpcDirectGetWorkStreamTest {
@Test
public void testConsumedWorkItems_itemsSplitAcrossResponses() throws
InterruptedException {
- int expectedRequests = 3;
+ // We send all the responses on the first request. We don't care if there
are additional
+ // requests.
+ int expectedRequests = 1;
CountDownLatch waitForRequests = new CountDownLatch(expectedRequests);
TestGetWorkRequestObserver requestObserver = new
TestGetWorkRequestObserver(waitForRequests);
GetWorkStreamTestStub testStub = new
GetWorkStreamTestStub(requestObserver);
@@ -426,9 +428,9 @@ public class GrpcDirectGetWorkStreamTest {
Windmill.WorkItem workItem3 =
Windmill.WorkItem.newBuilder()
.setKey(ByteString.copyFromUtf8("somewhat_long_key3"))
- .setWorkToken(2L)
- .setShardingKey(2L)
- .setCacheToken(2L)
+ .setWorkToken(3L)
+ .setShardingKey(3L)
+ .setCacheToken(3L)
.build();
List<ByteString> chunks1 = new ArrayList<>();
@@ -444,12 +446,12 @@ public class GrpcDirectGetWorkStreamTest {
chunks3.add(workItem3.toByteString());
+ assertTrue(waitForRequests.await(5, TimeUnit.SECONDS));
+
testStub.injectResponse(createResponse(chunks1, bytes.size() - third));
testStub.injectResponse(createResponse(chunks2, bytes.size() - 2 * third));
testStub.injectResponse(createResponse(chunks3, 0));
- assertTrue(waitForRequests.await(5, TimeUnit.SECONDS));
-
assertThat(scheduledWorkItems).containsExactly(workItem1, workItem2,
workItem3);
}
@@ -458,6 +460,7 @@ public class GrpcDirectGetWorkStreamTest {
private final TestGetWorkRequestObserver requestObserver;
private @Nullable StreamObserver<Windmill.StreamingGetWorkResponseChunk>
responseObserver;
+ private final CountDownLatch waitForStream = new CountDownLatch(1);
private GetWorkStreamTestStub(TestGetWorkRequestObserver requestObserver) {
this.requestObserver = requestObserver;
@@ -466,15 +469,17 @@ public class GrpcDirectGetWorkStreamTest {
@Override
public StreamObserver<Windmill.StreamingGetWorkRequest> getWorkStream(
StreamObserver<Windmill.StreamingGetWorkResponseChunk>
responseObserver) {
- if (this.responseObserver == null) {
- this.responseObserver = responseObserver;
- requestObserver.responseObserver = this.responseObserver;
- }
+ assertThat(this.responseObserver).isNull();
+ this.responseObserver = responseObserver;
+ requestObserver.responseObserver = this.responseObserver;
+ waitForStream.countDown();
return requestObserver;
}
- private void injectResponse(Windmill.StreamingGetWorkResponseChunk
responseChunk) {
+ private void injectResponse(Windmill.StreamingGetWorkResponseChunk
responseChunk)
+ throws InterruptedException {
+ waitForStream.await();
checkNotNull(responseObserver).onNext(responseChunk);
}
}