Hello,

Due to multiple threads writing into the completion queue in parallel, we 
have an internal queue in which we insert the message as soon as we receive 
a  tag from the completion queue.  So, all such insertions eventually 
become part of the internal queue. While inserting a new record in the 
internal queue, we make sure if there is already a tag whose write is 
pending, we pop that tag and perform Async Write on the gRPC queue.  Now, 
if the write tag after the gRPC buffer copy is complete/incomplete, and 
does not surface on the completion queue again, we do not process the other 
pending tags present in the internal queue.  It was based on the assumption 
that only one write can be pending at any point in time, even across 
multiple gRPC sessions.  Due to all this, messages get queued up in the 
internal queue and we eventually go to OOM after some time. 

So, my question is, if a write tag for a previous write does not surface on 
the completion queue, shall we wait for it indefinitely? We tried doing a 
timer-based wait and if not found, tried to gracefully terminate the 
session by calling FINISH() but that resulted in a core dump  as below:
gdb) bt
#0  __pthread_kill_implementation (threadid=<optimized out>, 
signo=signo@entry=6, no_tid=no_tid@entry=0) at pthread_kill.c:44
#1  0x00007fbc077d2723 in __pthread_kill_internal (signo=6, 
threadid=<optimized out>) at pthread_kill.c:78
#2  0x00007fbc07787876 in __GI_raise (sig=sig@entry=6) at 
../sysdeps/posix/raise.c:26
#3  0x00007fbc077727e3 in __GI_abort () at abort.c:79
#4  0x00007fbc06a87484 in grpc::CoreCodegen::assert_fail (this=<optimized 
out>, failed_assertion=<optimized out>, file=<optimized out>, 
line=<optimized out>)
    at /usr/src/debug/grpc/1.51.0-r0/git/src/cpp/common/core_codegen.cc:245
#5  0x00007fbc08504da7 in 
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 
grpc::internal::CallOpServerSendStatus, grpc::internal::CallNoOp<3>, 
grpc::internal::CallNoOp<4>, grpc::internal::CallNoOp<5>, 
grpc::internal::CallNoOp<6> >::ContinueFillOpsAfterInterception 
(this=0x7fbbe8021890) at /usr/include/grpcpp/impl/call_op_set.h:980

So, the question is what exactly should we do when an async Write is 
stuck/not completed? We can treat the other end of this session as 
unhealthy, so we can close the connection, but how without crashing?

Regards
Ashutosh
On Wednesday, April 26, 2023 at 11:11:57 PM UTC+5:30 [email protected] 
wrote:

First, it's important to clarify what it means to wait for a "Write" tag to 
complete on a completion queue:

When async "Write" is initially attempted, the message can be fully or 
partially buffered within gRPC. The corresponding tag will surface on the 
completion queue that the Write is associated with essentially after gRPC 
is done buffering the message, i.e. after it's written out relevant bytes 
to the wire.

This is unrelated to whether or not a "response" has been received from the 
peer, on the same stream.

So, the highlighted comment means that you can only have one async write 
"pending" per RPC, at any given time. I.e. in order to start a new write on 
a streaming RPC, one must wait for the previous write on that same stream 
to "complete" (i.e. for it's tag to be surfaced).

Multiple pending writes on different RPCs of the same completion queue are 
fine.
On Saturday, April 22, 2023 at 12:58:57 PM UTC-7 Ashutosh Maheshwari wrote:

Hello gRPC Team,

I have taken an extract from *“include/grpcpp/impl/codegen/async_stream.h”*

 *“*

  /// Request the writing of \a msg with identifying tag \a tag.

  ///

  /// Only one write may be outstanding at any given time. This means that

  /// after calling Write, one must wait to receive \a tag from the 
completion

  /// queue BEFORE calling Write again.

  /// This is thread-safe with respect to \a AsyncReaderInterface::Read

  ///

  /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to

  /// to deallocate once Write returns.

  ///

  /// \param[in] msg The message to be written.

  /// \param[in] tag The tag identifying the operation.

  virtual void Write(const W& msg, void* tag) = 0;

“

 After reading the highlighted part,  I can make the following two 
inferences:

   1. Only one write is permissible per stream. So we cannot write another 
   tag on a stream until we receive a response tag from the completion queue 
   for the previous write. 
   2. Only one write is permissible on the completion queue with no 
   dependency on available streams. When multiple clients connect to the grpc 
   server, then we will have multiple streams present. Now in such a scenario, 
   only one client can be responded to at a time due to the above-highlighted 
   limitation. 

 Can you please help us in understanding which one of our above inferences 
is true?

Recently,  I came across an issue where the gRPC client became a zombie 
process as its parent Python application was aborted. In this condition, 
the previous Write done on the stream connected with the client did not get 
ack, probably,  and I did not receive the Write tag back in the completion 
queue for that Write. My program kept waiting for the write tag and other 
messages continued to queue up as the previous Write did not finish its 
life cycle and hence I could not free the resources also for that tag.

I was wondering if I could have gone ahead with Write for other streams and 
queue up messages related to this stream till we get a write tag in return 
for the previous message. If I kill the zombie and clean up on the client, 
the Write tag is returned

Alternatively, is it possible  to force cleanup the inactive gRPC session ? 
What would happen if the Write tag is returned after the internal memory 
for that tag had been cleaned up . I guess it will crash. 

Please clarify the doubts,

Regards

Ashutosh (Ciena)

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/f005c8bd-6136-4df7-a73f-c7b0f54241d8n%40googlegroups.com.

Reply via email to