Hi,

my service has 2 methods, which share a resource. Each method invocation 
needs exclusive access to this resource. In the example the client makes 5 
successive requests for one of these methods, sends a message using the 
stream observer and completes the stream observer. Then, without further 
delay the next request is made in the same way.

The full source code of the sample application is here: 
https://github.com/Niklas-Peter/grpc-async.

The GRPC server is configured with 
ServerBuilder.executor(Executors.newSingleThreadExecutor());

Here the most important extracts:

*Client code:*
@Slf4j
public class MyServiceClient {
    @SneakyThrows
    public void myServiceMethodA() {
        log.info("myServiceMethodA(): started.");
        var writeConnection = stub.myServiceMethodA(new 
LoggingStreamObserver());
        log.info("myServiceMethodA(): received write connection.");

        writeConnection.onNext(createEvent());
        writeConnection.onCompleted();
    }
}

*Synchronous service implementation:*
@Slf4j
public class MySyncService extends MyServiceGrpc.MyServiceImplBase {
    private final Semaphore lock = new Semaphore(1);

    @SneakyThrows
    @Override
    public StreamObserver<Event> 
myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
        log.info(responseObserver + ": Acquiring lock ...");
        if (!lock.tryAcquire(10, TimeUnit.SECONDS)) {
            log.warn(responseObserver + ": Lock acquire timeout exceeded");
            return new NoOpEventStreamObserver(); // Only to prevent 
exceptions in the log.
        }

        log.info(responseObserver + ": Acquired lock.");

        return new StreamObserver<>() {
            private final List<Event> events = new ArrayList<>();

            @Override
            public void onNext(Event event) {
                log.info(responseObserver + ": Received event.");

                var preprocessedEvent = preprocess(event);
                events.add(preprocessedEvent);
            }

            @Override
            public void onCompleted() {
                log.info(responseObserver + ": Received complete.");

                var storageLibrary = new StorageLibrary();
                
storageLibrary.store(events.toArray(Event[]::new)).handle((unused, 
throwable) -> {
                    log.info(responseObserver + ": Store completed.");

                    
responseObserver.onNext(Confirmation.newBuilder().build());
                    responseObserver.onCompleted();

                    lock.release();

                    return null;
                });
            }

            private Event preprocess(Event event) {
                // The preprocessing already requires the lock.
                return event;
            }
        };
    }

    @SneakyThrows
    @Override
    public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
responseObserver) {
        if (!lock.tryAcquire(5, TimeUnit.SECONDS))
            throw new TimeoutException("The lock acquire timeout 
exceeded.");

        // Requires exclusive access to a shared resource and uses async 
I/O.
        var storageLibrary = new StorageLibrary();
        storageLibrary.store(event).handle((unused, throwable) -> {
            responseObserver.onNext(Confirmation.newBuilder().build());
            responseObserver.onCompleted();

            lock.release();

            return null;
        });
    }


    @Override
    public void otherServiceMethod(Event request, 
StreamObserver<Confirmation> responseObserver) {
        // Do something independent from the other service methods.
    }
}

*Output:*
09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring 
lock ...
09:53:03.453 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired 
lock.
09:53:03.456 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring 
lock ...
09:53:13.465 [pool-1-thread-1] WARN MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock 
acquire timeout exceeded
09:53:13.465 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring 
lock ...
09:53:23.484 [pool-1-thread-1] WARN MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock 
acquire timeout exceeded
09:53:23.484 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring 
lock ...
09:53:33.496 [pool-1-thread-1] WARN MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock 
acquire timeout exceeded
09:53:33.496 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring 
lock ...
09:53:43.501 [pool-1-thread-1] WARN MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock 
acquire timeout exceeded
09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
event.
09:53:43.517 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received 
complete.
09:53:43.533 [pool-1-thread-1] INFO MySyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store 
completed.

In my initial synchronous implementation I use a Semaphore as a lock. This 
leads to a deadlock until the acquireLock() timeout exceeded. I think this 
happens:
1. The only thread in the pool executes the first request. By doing so the 
first request acquires the lock and the StreamObserver is returned.
2. The initial processing (before the first messages via the StreamObserver 
arrive) for the first request is done now and the only thread is released.
3. In the executor's queue the 2nd request is already waiting for a thread, 
which executes it, why the just released thread handles the 2nd request.
4. The thread blocks, because the Semaphore is not released, yet.
5. In the executor's queue are further requests as well as messages for the 
StreamObservers waiting. However, especially the messages for the first 
request's StreamObserver can not be handled, so that the lock is not 
released.
6. After all pending requests after the 1st request exceeded the 
acquireLock() timeout and hence failed,
the now released thread handles the messages for the StreamObserver of the 
1st request and the 1st request completes.

Increasing the thread pool size is probably not a good solution, as it 
would always have to be greater than or equal to the number of concurrently 
active requests for these 2 methods.

*Asynchronous service implementation:*
public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase {
    private final AsyncSemaphore lock = new AsyncSemaphore(1, 
Optional.empty());

    @SneakyThrows
    @Override
    public StreamObserver<Event> 
myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
        log.info(responseObserver + ": Acquiring lock ...");
        // Already acquire the lock here (instead of in 
StreamObserver.onComplete())
        // to ensure the lock is acquired in the order,
        // in which the GRPC requests are handled, and not in the order, in 
which the GRPC stream observers
        // complete.
        var lockFuture = lock.acquire();

        return new StreamObserver<>() {
            private final List<Event> events = new ArrayList<>();

            @Override
            public void onNext(Event event) {
                log.info(responseObserver + ": Received event.");

                events.add(event);
            }

            @Override
            public void onCompleted() {
                log.info(responseObserver + ": Received complete; acquired 
lock: " + lockFuture.isDone());

                lockFuture.thenAccept(permit -> {
                    log.info(responseObserver + ": Acquired lock.");

                    var preprocessedEvents = events.stream()
                                                   
.map(MyAsyncService.this::preprocess)
                                                   .toArray(Event[]::new);
                    var storageLibrary = new StorageLibrary();
                    
storageLibrary.store(preprocessedEvents).handle((unused, throwable) -> {
                        log.info(responseObserver + ": Store completed.");

                        
responseObserver.onNext(Confirmation.newBuilder().build());
                        responseObserver.onCompleted();

                        permit.release();

                        return null;
                    });
                });
            }
        };
    }

    @SneakyThrows
    @Override
    public void myServiceMethodB(Event event, StreamObserver<Confirmation> 
responseObserver) {
        lock.acquireAndRun(() -> {
            log.info(responseObserver + ": Acquired lock.");

            // Requires exclusive access to a shared resource and uses 
async I/O.
            var preprocessedEvent = preprocess(event);
            var storageLibrary = new StorageLibrary();
            return storageLibrary.store(preprocessedEvent).handle((unused, 
throwable) -> {
                responseObserver.onNext(Confirmation.newBuilder().build());
                responseObserver.onCompleted();

                return null;
            });
        });
    }
}


*Output:*
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring 
lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring 
lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring 
lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring 
lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring 
lock ...
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
event.
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received 
complete; acquired lock: true
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired 
lock.
10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store 
completed.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received 
complete; acquired lock: true
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired 
lock.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received 
complete; acquired lock: false
10:14:55.141 [Thread-9] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store 
completed.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received 
complete; acquired lock: false
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received 
complete; acquired lock: false
10:14:55.153 [Thread-9] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired 
lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store 
completed.
10:14:55.153 [Thread-8] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired 
lock.
10:14:55.153 [Thread-9] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store 
completed.
10:14:55.153 [Thread-9] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired 
lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store 
completed.

In my asynchronous implementation this deadlock does not appear. However, 
now I have to manage an application-side buffer for each active request 
until this request gets the lock. I hoped that there was a way to let GRPC 
handle the problem by buffering the requests at a higher level instead of 
this lower application-side level and handle the exception (discard the 
request, send an exception to the client, ...) when the buffer fills. For 
example, by returning a Future<StreamObserver<>> instead of a 
StreamObserver as I know it from ASP.NET MVC asynchronous controller 
methods 
(https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos).
 
Probably there are similar examples in the Java world, but I have less 
experience with the Java ecosystem, yet.

I have already read this discussion: 
https://groups.google.com/g/grpc-io/c/XCMIva8NDO8

Thanks in advance!

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/803db9b1-808f-4032-bd0a-c58b06a30bf0n%40googlegroups.com.

Reply via email to