The reason that onMessage is never called is because next.startCall() is 
delayed so ServerCall's request() is never called. That is, inbound 
messages will be waiting on flow control for the first message. There is a 
*livelock* between ServerCall and ServerCall.Listener: ServerCall.Listener 
is waiting on the first message to start ServerCall while starting 
ServerCall requests the first message. So you would need to initiate the 
request() for the first message:

   @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(final 
ServerCall<ReqT, RespT> call,
        final Metadata headers, final ServerCallHandler<ReqT, RespT> next) {

      final Listener<ReqT> cacheLookUpCallListener = new 
ForwardingServerCallListener<ReqT>() {
        private final Listener<ReqT> NOOP = new Listener<ReqT>() {};
        private Listener<ReqT> delegate = NOOP;
 
        @Override
        protected Listener<ReqT> delegate() {
          return delegate;
        }
 
        @Override
        public void onMessage(ReqT message) {
          if (delegate == NOOP) {
            // TODO: look up cache, close call and return if cache hit
 
            delegate = next.startCall(call, headers);
          }
          super.onMessage(message);
        }
      };
 
      ServerCallHandler<ReqT, RespT> handler = new ServerCallHandler<ReqT, 
RespT>() {
        @Override
        public Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, 
Metadata headers) {
          call.request(1);
          return cacheLookUpCallListener;
        }
      };
 
      return handler.startCall(call, headers);
    }


Note, with this simple solution you are throwing away ServerCall.Listener 
callbacks before receiving the first message (e.g., onReady()). This is 
mostly fine for unary calls. But for streaming, it may break flow control 
(if there is one).

On Thursday, June 17, 2021 at 11:44:44 PM UTC-7 Alexander Chiu wrote:

> Thanks for getting back to me.
>
> I'm currently doing:
>
>     @Override
>     public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, 
> RespT> call,
>             Metadata headers, ServerCallHandler<ReqT, RespT> next) {
>         if (!cacheableMethods.contains(call.getMethodDescriptor())) {
>             return next.startCall(call, headers);
>         }
>         ServerCall.Listener<ReqT> no_op = new ServerCall.Listener<ReqT
> >() {
>         };
>         return new ForwardingServerCallListener<ReqT>() {
>             ServerCall.Listener<ReqT> delegate = no_op;
>
>             @Override
>             protected ServerCall.Listener<ReqT> delegate() {
>                 return delegate;
>             }
>
>             @Override
>             public void onMessage(ReqT message) {
>                 if (delegate == no_op) {
>                     Message request = (Message) message;
>                     Message resp = cache.getIfPresent(request);
>                     if (resp != null) {
>                         call.sendHeaders(headers);
>                         call.sendMessage((RespT) resp);
>                         call.close(Status.OK, new Metadata());
>                         return;
>                     }
>                     delegate = next.startCall(call, headers);
>                 }
>                 onMessage(message);
>             }
>         };
>     }
>
> and running a unit test using the InProcessServerBuilder. The test times 
> out, and when I step through with a debugger it doesn't look like onMessage 
> is ever called. If the above offers no clues, I can spend some time 
> setting up a minimal example project I can share.
> On Friday, 18 June 2021 at 03:38:22 UTC+2 [email protected] wrote:
>
>> The idea in 
>> https://github.com/grpc/grpc-java/issues/5414#issuecomment-468754271 
>> should work for your use case and the code should be almost same as that. 
>> Can you post a snippet for your code so that we can see what went wrong? 
>> The idea is to have your ServerInterceptor produce a ServerCall.Listener 
>> that delays calling into the real ServerCall.Listener (or more 
>> precisely, avoid calling into the real ServerCall.Listener if the 
>> cache-lookup ServerCall.Listener can end the call early with cached 
>> data). 
>>
>> On Thursday, May 27, 2021 at 8:25:37 AM UTC-7 Alexander Chiu wrote:
>>
>>> Hi
>>>
>>> I'm trying to build a caching ServerInterceptor which will be the first 
>>> in a chain of interceptors. I managed to get the basic functionality 
>>> working by returning a SimpleForwardingServerCallListener which checks 
>>> a cache in the onMessage method, and manually sending the headers and 
>>> response and closing the call in case of a cache hit, otherwise delegating 
>>> to next.startCall(...). However, it seems like next.startCall(...) causes 
>>> the call to cascade to the subsequent interceptors, and onMessage on 
>>> the listener is only called after that.
>>>
>>> My searches have come across:
>>>
>>>    - https://gitter.im/grpc/grpc?at=5b59f01bbd17b9615905d7b5
>>>    - 
>>>    
>>> https://stackoverflow.com/questions/39389771/validate-request-using-grpc-server-interceptor
>>>
>>> and a few others where it seems like it possible to delay 
>>> next.startCall(...) until after the first message is received. I tried 
>>> to do something akin to 
>>> https://github.com/grpc/grpc-java/issues/5414#issuecomment-468754271 using 
>>> a noop listener, but then the onMessage on the listener never seems to 
>>> be called at all.
>>>
>>> Am I missing something obvious?
>>>
>>> PS. A big thank you to all the maintainers
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/c041d8af-973f-4f42-b380-f1e827b7c58en%40googlegroups.com.

Reply via email to