Hi All,

I've recently started exploring grpc for server side telemetry. Both client 
and serve are running on same host machine. 
Client is sending one subscribe request and then goes into loop waiting for 
streams. Server is implemented using async API
with two threads. Producer thread is producing the data to a Queue say 
60000 elements in a tight loop then go for sleep for 5 sec. 
Service thread is reading data from Q and invoking write() API.  As I 
understood from documents/examples only one write is allowed before 
notification
is received/serviced via AsyncNext so effectively it becomes like (service 
completion Q, write, service completion Q, write ....)

With this I'm getting maximum 10K streams per second (Encoder is not really 
doing much).
So first question is this right way to us C++ binding. Second question is 
10K/seconds is max ? (it seems too low and am sure missing something here)

ServiceThread ()
{
 ----
  while (true) {
      // First service the Completion Q
      status = (cq_->AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME))); 
<< don't wait 
      if (status == CompletionQueue::NextStatus::GOT_EVENT) {
      switch (static_cast<Type>(reinterpret_cast<size_t>(tag))) {
        default :
          if (ok) {
            static_cast<Stream*>(tag)->Proceed();
          } else {
          }
          break;
      }
      if (++i == 10000) {
        std::cout << " Processed msgs " << i << std::endl;
        i = 0;
      }
    } else { // If nothing in completion Q then process the next element 
from thread Q which will invoke grpc write() API.
      ServiceThreadQ(&max_wait);
    }
  }
}

ServiceThreadQ (struct timespec *max_wait) {
  // Process just one element from Q. If Empty then do timed wait.
    pthread_mutex_lock(&mq->mu_queue))
    if(!mq->msg_queue.empty()) {
      intf_counters *cntr = mq->msg_queue.front();
      mq->msg_queue.pop();
      pthread_mutex_unlock(&mq->mu_queue)
      PushStream(cntr); << invokes write() API and no other work
      free(cntr);
      return;
    } else {
      int rv = pthread_cond_timedwait(&mq->cond, &mq->mu_queue, max_wait);
      switch (rv) {
        ----
      }
      pthread_mutex_unlock(&mq->mu_queue)
      return;
    }
}

Thanks

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/23065c84-71e8-4e5e-b516-51530318e31c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to