This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 180f3a6bdb5 Attempt to fix flaky GrpcLoggingServiceTest (#37891)
180f3a6bdb5 is described below
commit 180f3a6bdb5c929c252cba3b62cf7bba800f58b0
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Mar 19 14:35:32 2026 +0000
Attempt to fix flaky GrpcLoggingServiceTest (#37891)
---
.../logging/GrpcLoggingServiceTest.java | 116 ++++++++++-----------
1 file changed, 55 insertions(+), 61 deletions(-)
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
index fc504371023..02e500f3e85 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
@@ -22,8 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -63,39 +61,42 @@ public class GrpcLoggingServiceTest {
GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
try (GrpcFnServer<GrpcLoggingService> server =
GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
-
- Collection<Callable<Void>> tasks = new ArrayList<>();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ Collection<Future<?>> futures = new ArrayList<>();
+ CountDownLatch waitForServerHangup = new CountDownLatch(3);
for (int i = 1; i <= 3; ++i) {
final int instructionId = i;
- tasks.add(
- () -> {
- CountDownLatch waitForServerHangup = new CountDownLatch(1);
- String url = server.getApiServiceDescriptor().getUrl();
- ManagedChannel channel =
InProcessChannelBuilder.forName(url).build();
- StreamObserver<LogEntry.List> outboundObserver =
- BeamFnLoggingGrpc.newStub(channel)
- .logging(
- TestStreams.withOnNext(messageDiscarder)
- .withOnCompleted(new
CountDown(waitForServerHangup))
- .build());
- outboundObserver.onNext(createLogsWithIds(instructionId,
-instructionId));
- outboundObserver.onCompleted();
- waitForServerHangup.await();
- return null;
- });
+ futures.add(
+ executorService.submit(
+ () -> {
+ String url = server.getApiServiceDescriptor().getUrl();
+ ManagedChannel channel =
InProcessChannelBuilder.forName(url).build();
+ StreamObserver<LogEntry.List> outboundObserver =
+ BeamFnLoggingGrpc.newStub(channel)
+ .logging(
+ TestStreams.withOnNext(messageDiscarder)
+ .withOnCompleted(new
CountDown(waitForServerHangup))
+ .build());
+ outboundObserver.onNext(createLogsWithIds(instructionId,
-instructionId));
+ outboundObserver.onCompleted();
+ }));
}
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.invokeAll(tasks);
- assertThat(
- logs,
- containsInAnyOrder(
- createLogWithId(1L),
- createLogWithId(2L),
- createLogWithId(3L),
- createLogWithId(-1L),
- createLogWithId(-2L),
- createLogWithId(-3L)));
+ // Make sure all streams were created and issued client operations.
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ // Ensure all the streams were completed as expected before closing the
server.
+ waitForServerHangup.await();
}
+ assertThat(
+ logs,
+ containsInAnyOrder(
+ createLogWithId(1L),
+ createLogWithId(2L),
+ createLogWithId(3L),
+ createLogWithId(-1L),
+ createLogWithId(-2L),
+ createLogWithId(-3L)));
}
@Test
@@ -107,32 +108,23 @@ public class GrpcLoggingServiceTest {
GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
CountDownLatch waitForTermination = new CountDownLatch(3);
- final BlockingQueue<StreamObserver<LogEntry.List>> outboundObservers =
- new LinkedBlockingQueue<>();
- Collection<Callable<Void>> tasks = new ArrayList<>();
- for (int i = 1; i <= 3; ++i) {
- final int instructionId = i;
- tasks.add(
- () -> {
- ManagedChannel channel =
-
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
- .build();
- StreamObserver<LogEntry.List> outboundObserver =
- BeamFnLoggingGrpc.newStub(channel)
- .logging(
- TestStreams.withOnNext(messageDiscarder)
- .withOnError(new CountDown(waitForTermination))
- .build());
- outboundObserver.onNext(createLogsWithIds(instructionId,
-instructionId));
- outboundObservers.add(outboundObserver);
- return null;
- });
+ final Collection<StreamObserver<LogEntry.List>> outboundObservers = new
ArrayList<>();
+ // Create all the streams
+ for (int instructionId = 1; instructionId <= 3; ++instructionId) {
+ ManagedChannel channel =
+
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl()).build();
+ StreamObserver<LogEntry.List> outboundObserver =
+ BeamFnLoggingGrpc.newStub(channel)
+ .logging(
+ TestStreams.withOnNext(messageDiscarder)
+ .withOnError(new CountDown(waitForTermination))
+ .build());
+ outboundObserver.onNext(createLogsWithIds(instructionId,
-instructionId));
+ outboundObservers.add(outboundObserver);
}
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.invokeAll(tasks);
- for (int i = 1; i <= 3; ++i) {
- outboundObservers.take().onError(new RuntimeException("Client " + i));
+ for (StreamObserver<LogEntry.List> outboundObserver : outboundObservers)
{
+ outboundObserver.onError(new RuntimeException("Client"));
}
waitForTermination.await();
}
@@ -142,19 +134,19 @@ public class GrpcLoggingServiceTest {
public void testServerCloseHangsUpClients() throws Exception {
LinkedBlockingQueue<LogEntry> logs = new LinkedBlockingQueue<>();
ExecutorService executorService = Executors.newCachedThreadPool();
- Collection<Future<Void>> futures = new ArrayList<>();
final GrpcLoggingService service =
GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+ CountDownLatch waitForServerHangup = new CountDownLatch(3);
try (GrpcFnServer<GrpcLoggingService> server =
GrpcFnServer.allocatePortAndCreateFor(service,
InProcessServerFactory.create())) {
+ Collection<Future<?>> futures = new ArrayList<>();
for (int i = 1; i <= 3; ++i) {
final long instructionId = i;
futures.add(
executorService.submit(
() -> {
{
- CountDownLatch waitForServerHangup = new CountDownLatch(1);
ManagedChannel channel =
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
.build();
@@ -165,19 +157,21 @@ public class GrpcLoggingServiceTest {
.withOnCompleted(new
CountDown(waitForServerHangup))
.build());
outboundObserver.onNext(createLogsWithIds(instructionId));
- waitForServerHangup.await();
return null;
}
}));
}
+ // Ensure all the streams have started and sent their instruction.
+ for (Future<?> f : futures) {
+ f.get();
+ }
// Wait till each client has sent their message showing that they have
connected.
for (int i = 1; i <= 3; ++i) {
logs.take();
}
+ // Close the server without closing the streams and ensure they observe
the hangup.
}
- for (Future<Void> future : futures) {
- future.get();
- }
+ waitForServerHangup.await();
}
private BeamFnApi.LogEntry.List createLogsWithIds(long... ids) {