Hi,
The restriction for one write at a time (i.e. before the notification is
received on the completion queue) applies to a single stream/RPC. In other
words if you have multiple RPCs ongoing, you could have multiple writes
ongoing. As for your question on whether 10K per second is the max, it
would depend on your environment. We do have benchmarks that show the
current state of gRPC performance. Unfortunately, there is an ongoing bug
there and we are trying to get it back up.
https://grpc.io/docs/guides/benchmarking.html
I think you might be able to get some gains by separating out the
completion queue and the service thread work to different threads. That
way, you would not need to zero deadline AsyncNext either.
On Wednesday, April 24, 2019 at 10:03:42 AM UTC-7, Lalit Kumar wrote:
>
> Hi All,
>
> I've recently started exploring grpc for server side telemetry. Both
> client and serve are running on same host machine.
> Client is sending one subscribe request and then goes into loop waiting
> for streams. Server is implemented using async API
> with two threads. Producer thread is producing the data to a Queue say
> 60000 elements in a tight loop then go for sleep for 5 sec.
> Service thread is reading data from Q and invoking write() API. As I
> understood from documents/examples only one write is allowed before
> notification
> is received/serviced via AsyncNext so effectively it becomes like (service
> completion Q, write, service completion Q, write ....)
>
> With this I'm getting maximum 10K streams per second (Encoder is not
> really doing much).
> So first question is this right way to us C++ binding. Second question is
> 10K/seconds is max ? (it seems too low and am sure missing something here)
>
> ServiceThread ()
> {
> ----
> while (true) {
> // First service the Completion Q
> status = (cq_->AsyncNext(&tag, &ok,
> gpr_time_0(GPR_CLOCK_REALTIME))); << don't wait
> if (status == CompletionQueue::NextStatus::GOT_EVENT) {
> switch (static_cast<Type>(reinterpret_cast<size_t>(tag))) {
> default :
> if (ok) {
> static_cast<Stream*>(tag)->Proceed();
> } else {
> }
> break;
> }
> if (++i == 10000) {
> std::cout << " Processed msgs " << i << std::endl;
> i = 0;
> }
> } else { // If nothing in completion Q then process the next element
> from thread Q which will invoke grpc write() API.
> ServiceThreadQ(&max_wait);
> }
> }
> }
>
> ServiceThreadQ (struct timespec *max_wait) {
> // Process just one element from Q. If Empty then do timed wait.
> pthread_mutex_lock(&mq->mu_queue))
> if(!mq->msg_queue.empty()) {
> intf_counters *cntr = mq->msg_queue.front();
> mq->msg_queue.pop();
> pthread_mutex_unlock(&mq->mu_queue)
> PushStream(cntr); << invokes write() API and no other work
> free(cntr);
> return;
> } else {
> int rv = pthread_cond_timedwait(&mq->cond, &mq->mu_queue, max_wait);
> switch (rv) {
> ----
> }
> pthread_mutex_unlock(&mq->mu_queue)
> return;
> }
> }
>
> Thanks
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/35a66f97-aba2-45c8-831f-fb3d874ecf81%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.