Thank you!

On Wednesday, December 2, 2020 at 8:56:31 PM UTC [email protected] wrote:

> On Friday, November 27, 2020 at 11:05:39 AM UTC-8 Joseph Liu wrote:
>
> > Instead, the logic for handling `SayHello` seems to be handled in the
> > `Proceed` function. If I wanted to create another method, do I just 
> handle
> > it all in Proceed? And if so, how would I do that?
>
> A high level flow that you can use:
>
> 1. Create one completion queue. (You can create multiple, but that isn't
>    relevant to the problem you're trying to solve right now. Multiple
>    completion queues will come in to play much later and only if you see
>    lock contention inside the completion queues themselves.)
>
> 2. For each method you want to handle, invoke service_->RequestFoo(),
>    service_->RequestBar(), ... with _unique_ context, request, responder,
>    and tag values.
>
> 3. Poll the completion queue. When the Request* tag value for, say, the
>    RequestFoo method comes out of the completion queue, run the Foo() 
> logic.
>    When the RequestBar completion queue comes out, instead, run the Bar()
>    logic.
>
> 4. After the Request* tag has come out of the completion queue, you will
>    need to re-invoke service_->Request* with a new unique set of context,
>    request, responder, and tag values. If you don't do this, gRPC will not
>    process an incoming call again. You _do not_ need to wait for the logic
>    of responding to the prior call to finish before doing this.
>
> When I implemented this pattern, I had a helper type like the example's
> CallData that knew the association between the gRPC method and the 
> "business
> logic" to run. It's job was to request a call from gRPC, wait for the call
> to come out of the completion queue, and hand off the invocation of the
> business logic to a "work item dispatcher" component.
>
> You will need to adjust this for streaming calls, but that should be
> mechanical.
>
> It looked something like the following. (I wrote this code in my email
> client. I'm sure it doesn't compile, but you should be able to see the
> pathways.)
>
> int main(int argc, char** argv) {
>   std::shared_ptr<ServerCompletionQueue> cq = SetupCompletionQueue();
>   std::shared_ptr<Dispatcher> dispatcher = SetupDispatcher(cq);
>   std::unique_ptr<Server> = SetupGrpcServerViaServerBuilder();
>
>   Greeter::AsyncService service;
>
>   MethodRegistration<Greeter::AsyncService, HelloRequest, HelloResponse> 
> sayHello1(
>     cq,
>     service,
>     Greeter::AsyncService::RequestSayHello1,
>     &SomeServiceImpl::HandleSayHello1,
>     workItemDispatcher);
>
>   MethodRegistration<Greeter::AsyncService, HelloRequest, HelloResponse> 
> sayHello2(
>     cq,
>     service,
>     Greeter::AsyncService::RequestSayHello2,
>     &SomeServiceImpl::HandleSayHello2,
>     workItemDispatcher);
>
> // In our system, everything enqueued to a gRPC completion queue implements
> // this interface.
> //
> // The threads that poll the gRPC completion queue then just do the
> // following:
> //
> // void loop() {
> //   void* tag;
> //   bool ok;
> // 
> //   while (m_cq->Next(&tag, &ok))
> //   {
> //     static_cast<CompletionQueueItem*>(tag)->Invoke(ok);
> //   }
> // }
> class CompletionQueueItem
> {
> public:
>   virtual ~CompletionQueueItem() = default;
>   void Invoke(bool ok) = 0;
> };
>
> template <typename TServiceStub, typename TRequest, typename TResponse>
> class MethodRegistration : CompletionQueueItem
> {
> public:
>   // A pointer to the member Request* function from the generated stub.
>   // Pointer to member function only used as an example.
>   using RegistrationFunc = bool (TServiceStub::*)(
>     ServerContext*,
>     TRequest*,
>     TResponse*,
>     ServerCompletionQueue*,
>     ServerCompletionQueue*,
>     void*);
>
>   // Some business logic/user code associated with processing a request.
>   // std::function only used as an example.
>   using std::function<ServerContext*, TRequest*, Tresponse> UserFunc;
>
>   MethodRegistration(
>     std::shared_ptr<ServerCompletionQueue> cq,
>     TServiceStub* service,
>     RegistrationFunc registrationFunc,
>     UserFunc userFunc,
>     std::shared_ptr<Dispatcher> dispatcher)
>   : m_cq(cq),
>     m_service(service),
>     m_registrationFunc(registrationFunc),
>     m_businessLogic(userFunc),
>     m_dispatcher(dispatcher) {
>     Register();
>   }
>
>   void Invoke(bool ok) {
>     if (ok) {
>       // EnqueueUnaryCall knows how to invoke the std::function with the
>       // values from _unaryRequestData and send the response back.
>       m_dispatcher.EnqueueUnaryCall(
>         m_businessLogic,
>         std::move(m_unaryRequestData));
>
>       Register();
>     }
>   }
>
> private:
>   void Register() {
>     // Create a new struct for per-request data. Per-request data has a
>     // different lifetime than the data needed to register for method 
> calls.
>     
>     m_unaryRequestData = std::make_unique<UnaryRequestData<TRequest, 
> TResponse>>();
>     (m_service.*m_registrationFunc)(
>       &m_unaryRequestData->ctx,
>       &m_unaryRequestData->request,
>       &m_unaryRequestData->responder,
>       m_cq,
>       m_cq,
>       static_cast<void*>(static_cast<CompletionQueueItem*>(this)));
>   }
>
>   std::shared_ptr<ServerCompletionQueue> m_cq;
>
>   TServiceStub* m_service;
>
>   RegistrationFunc m_registrationFunc;
>
>   std::function<ServerContext*, TRequest*, TResponse> m_businessLogic;
>
>   // Something that can run arbitrary bits of code. E.g., a thread pool. In
>   // this example, the dispatcher also know how to poll completion queues.
>   std::shared_ptr<Dispatcher> m_dispatcher;
>
>   std::unique_ptr<UnaryRequestData<TRequest, TResponse>> 
> m_unaryRequestData;
> };
>
> // All of the data associated with a single call. This data starts being
> // owned by the MethodRegistration, but it transfered to the Dispatcher 
> when
> // the business logic needs to be run. This way, we can re-request another
> // incoming call while still processing earlier calls.
> template <typename TRequest, typename TResponse>
> struct UnaryRequestData
> {
>   ServerContext ctx;
>   TRequest request;
>   ServerAsyncResponseWriter<TResponse> responder;
>
>   UnaryRequestData()
>   : ctx(), request(), responder(&ctx)
>   {}
> };
>
> The aforementioned system where I implemented this is the Bond-over-gRPC
> project [1]. It doesn't use Protocol Buffers for its IDL and it presents a
> higher-level abstraction, so the interface to the generated code is
> different. The fundamental method registration and lifetime management
> concepts will be analogous, however. The code for the C++ gRPC stuff lives
> in the cpp/inc/bonc/ext/grpc directory [2].
>
> Hope this helps.
>
> [1]: 
> https://microsoft.github.io/bond/manual/bond_over_grpc.html#bond-over-grpc-for-c-1
> [2]: https://github.com/microsoft/bond/tree/master/cpp/inc/bond/ext/grpc
>
> --
> Christopher Warrington
> Microsoft Corp.
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/2658554c-c2e7-442e-8b2b-8c215922c6aen%40googlegroups.com.

Reply via email to