Hello,
Due to multiple threads writing into the completion queue in parallel, we
have an internal queue in which we insert the message as soon as we receive
a tag from the completion queue. So, all such insertions eventually
become part of the internal queue. While inserting a new record in the
internal queue, we make sure if there is already a tag whose write is
pending, we pop that tag and perform Async Write on the gRPC queue. Now,
if the write tag after the gRPC buffer copy is complete/incomplete, and
does not surface on the completion queue again, we do not process the other
pending tags present in the internal queue. It was based on the assumption
that only one write can be pending at any point in time, even across
multiple gRPC sessions. Due to all this, messages get queued up in the
internal queue and we eventually go to OOM after some time.
So, my question is, if a write tag for a previous write does not surface on
the completion queue, shall we wait for it indefinitely? We tried doing a
timer-based wait and if not found, tried to gracefully terminate the
session by calling FINISH() but that resulted in a core dump as below:
gdb) bt
#0 __pthread_kill_implementation (threadid=<optimized out>,
signo=signo@entry=6, no_tid=no_tid@entry=0) at pthread_kill.c:44
#1 0x00007fbc077d2723 in __pthread_kill_internal (signo=6,
threadid=<optimized out>) at pthread_kill.c:78
#2 0x00007fbc07787876 in __GI_raise (sig=sig@entry=6) at
../sysdeps/posix/raise.c:26
#3 0x00007fbc077727e3 in __GI_abort () at abort.c:79
#4 0x00007fbc06a87484 in grpc::CoreCodegen::assert_fail (this=<optimized
out>, failed_assertion=<optimized out>, file=<optimized out>,
line=<optimized out>)
at /usr/src/debug/grpc/1.51.0-r0/git/src/cpp/common/core_codegen.cc:245
#5 0x00007fbc08504da7 in
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus, grpc::internal::CallNoOp<3>,
grpc::internal::CallNoOp<4>, grpc::internal::CallNoOp<5>,
grpc::internal::CallNoOp<6> >::ContinueFillOpsAfterInterception
(this=0x7fbbe8021890) at /usr/include/grpcpp/impl/call_op_set.h:980
So, the question is what exactly should we do when an async Write is
stuck/not completed? We can treat the other end of this session as
unhealthy, so we can close the connection, but how without crashing?
Regards
Ashutosh
On Wednesday, April 26, 2023 at 11:11:57 PM UTC+5:30 [email protected]
wrote:
First, it's important to clarify what it means to wait for a "Write" tag to
complete on a completion queue:
When async "Write" is initially attempted, the message can be fully or
partially buffered within gRPC. The corresponding tag will surface on the
completion queue that the Write is associated with essentially after gRPC
is done buffering the message, i.e. after it's written out relevant bytes
to the wire.
This is unrelated to whether or not a "response" has been received from the
peer, on the same stream.
So, the highlighted comment means that you can only have one async write
"pending" per RPC, at any given time. I.e. in order to start a new write on
a streaming RPC, one must wait for the previous write on that same stream
to "complete" (i.e. for it's tag to be surfaced).
Multiple pending writes on different RPCs of the same completion queue are
fine.
On Saturday, April 22, 2023 at 12:58:57 PM UTC-7 Ashutosh Maheshwari wrote:
Hello gRPC Team,
I have taken an extract from *“include/grpcpp/impl/codegen/async_stream.h”*
*“*
/// Request the writing of \a msg with identifying tag \a tag.
///
/// Only one write may be outstanding at any given time. This means that
/// after calling Write, one must wait to receive \a tag from the
completion
/// queue BEFORE calling Write again.
/// This is thread-safe with respect to \a AsyncReaderInterface::Read
///
/// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
/// to deallocate once Write returns.
///
/// \param[in] msg The message to be written.
/// \param[in] tag The tag identifying the operation.
virtual void Write(const W& msg, void* tag) = 0;
“
After reading the highlighted part, I can make the following two
inferences:
1. Only one write is permissible per stream. So we cannot write another
tag on a stream until we receive a response tag from the completion queue
for the previous write.
2. Only one write is permissible on the completion queue with no
dependency on available streams. When multiple clients connect to the grpc
server, then we will have multiple streams present. Now in such a scenario,
only one client can be responded to at a time due to the above-highlighted
limitation.
Can you please help us in understanding which one of our above inferences
is true?
Recently, I came across an issue where the gRPC client became a zombie
process as its parent Python application was aborted. In this condition,
the previous Write done on the stream connected with the client did not get
ack, probably, and I did not receive the Write tag back in the completion
queue for that Write. My program kept waiting for the write tag and other
messages continued to queue up as the previous Write did not finish its
life cycle and hence I could not free the resources also for that tag.
I was wondering if I could have gone ahead with Write for other streams and
queue up messages related to this stream till we get a write tag in return
for the previous message. If I kill the zombie and clean up on the client,
the Write tag is returned
Alternatively, is it possible to force cleanup the inactive gRPC session ?
What would happen if the Write tag is returned after the internal memory
for that tag had been cleaned up . I guess it will crash.
Please clarify the doubts,
Regards
Ashutosh (Ciena)
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/f005c8bd-6136-4df7-a73f-c7b0f54241d8n%40googlegroups.com.