FYI - not sure if it'd matter, though, we are testing this implementation 
using Kreya <https://kreya.app/>tool & there is envoy proxy between client 
and GRPC

On Tuesday, June 29, 2021 at 9:00:30 AM UTC+5:30 Avinash Jagtap IN wrote:

> here is the complete code:
> #include <stdlib.h>
>
> #pragma region gRPC
> #include <grpcpp/grpcpp.h>
> #include <grpcpp/health_check_service_interface.h>
> #include <grpcpp/ext/proto_server_reflection_plugin.h>
> #pragma endregion gRPC
>
> #include <iostream>
> #include <memory>
> #include <string>
> #include <sstream>
> #include <iomanip>
> #include <thread>
>
> #pragma region Generated Files
> #include "napa_plugins_config.hpp"
> #include "napa_plugins_core_ssl.hpp"
> #pragma endregion Generated Files
>
> #pragma region
> #include "rapidjson/document.h"
> #pragma endregion
>
> #include <future>
>
> #include <grpcpp/impl/codegen/service_type.h>
> #include <grpcpp/alarm.h>
>
> #include "protos/PingPong.grpc.pb.h"
> #include "protos/PingPong.pb.h"
>
> using grpc::Server;
> using grpc::ServerBuilder;
> using grpc::ServerContext;
> using grpc::Status;
> using grpc::ServerCompletionQueue;
> using grpc::ServerAsyncResponseWriter;
>
>
> class PingPongServiceImpl final
>     : public ping_pong::PingPong::Service {
> public:
>     ~PingPongServiceImpl() {
>         std::cout << "Destructor of PingPongServiceImpl" << std::endl;
>     }
> private:
>     grpc::Status PingUnary(
>         grpc::ServerContext* context,
>         const ping_pong::PingPongRequest* request,
>         ping_pong::PingPongReply* reply) override
>     {
>         std::cout << "PingPong" << std::endl;
>
>         std::cout
>             << "PingPong - input_msg = "
>             << request->input_msg()
>             << std::endl;
>
>         if (request->input_msg() == "hello") {
>             reply->set_output_msg("world");
>         }
>         else {
>             reply->set_output_msg("I can't pong unless you ping me 
> 'hello'!");
>         }
>
>         std::cout << "Replying with " << reply->output_msg() << std::endl;
>
>         return grpc::Status::OK;
>     }
>
> };
>
> typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_ 
> const std::string location);
> typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
> typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool 
> scanFailed);
>
> class DummyScanner
> {
> public:
>     DummyScanner()
>         :scanContext_(NULL),
>         scanCount_(0),
>         scanProgressCallback_(NULL),
>         scanStartedCallback_(NULL),
>         scanFinishedCallback_(NULL)
>     {
>     }
>     ~DummyScanner()
>     {
>         if (dummyScanThread_.joinable())
>         {
>             dummyScanThread_.join();
>         }
>     }
>     void RegisterScanProgressCallbacks(
>         ScanProgressCallbackType* onScanProgress,
>         ScanStartedCallbackType* onScanStarted,
>         ScanFinishedCallbackType* onScanFinished,
>         _In_opt_ void* context)
>     {
>         scanProgressCallback_ = onScanProgress;
>         scanStartedCallback_ = onScanStarted;
>         scanFinishedCallback_ = onScanFinished;
>         scanContext_ = context;
>     }
>
>     bool StartScan()
>     {
>         if (scanCount_ > 0)
>         {
>             return false;
>         }
>         if (dummyScanThread_.joinable())
>         {
>             dummyScanThread_.join();
>         }
>
>         dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport, 
> this);
>         return true;
>     }
>
> private:
>     void dummyScanAndReport()
>     {
>         while (true)
>         {
>             if (scanCount_ == 100)
>             {
>                 if (scanFinishedCallback_ != NULL)
>                 {
>                     scanFinishedCallback_(scanContext_, false);
>                 }
>                 break;
>             }
>             if (scanCount_ == 1)
>             {
>                 if (scanStartedCallback_ != NULL)
>                 {
>                     scanStartedCallback_(scanContext_);
>                 }
>             }
>             if (scanCount_ < 100)
>             {
>                 if (scanProgressCallback_ != NULL)
>                 {
>                     std::stringstream ss;
>                     ss << scanCount_;
>                     scanProgressCallback_(scanContext_, ss.str());
>                 }
>             }
>             scanCount_++;
>             std::this_thread::sleep_for(std::chrono::milliseconds(100));
>         }
>         scanCount_ = 0;
>     }
>
>
> private:
>     void* scanContext_;
>     int scanCount_;
>     ScanProgressCallbackType* scanProgressCallback_;
>     ScanStartedCallbackType* scanStartedCallback_;
>     ScanFinishedCallbackType* scanFinishedCallback_;
>     std::thread dummyScanThread_;
> };
>
> enum CallStatus
> {
>     CREATE,
>     PROCESS,
>     FINISH,
>     PUSH_TO_BACK
> };
>
> // interface for service to handle the async RPC
> // every RPC will need to implement this interface, so that GRPC server can
> // call RPC without knowing its type.
> class IAsyncRpcDataAdapter
> {
> public:
>     virtual void Proceed() = 0;
>     virtual std::string Name() = 0;
>     virtual ~IAsyncRpcDataAdapter() = default;
>
> protected:
>     IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service, 
> ServerCompletionQueue* cq)
>         : pingPongAsync_(service)
>         , cq_(cq)
>         , status_(CREATE)
>     {
>     }
>     ping_pong::PingPong::AsyncService* pingPongAsync_;
>     ServerCompletionQueue* cq_;
>     ServerContext ctx_;
>     CallStatus status_; // The current serving state.
>     grpc::Alarm alarm_;
> };
>
> class StartScanCallData : public IAsyncRpcDataAdapter
> {
> public:
>     StartScanCallData(ping_pong::PingPong::AsyncService* service, 
> ServerCompletionQueue* cq)
>     {
>         Proceed();
>     }
>
>     void Proceed() override
>     {
>         std::cout << __FUNCTION__ << std::endl;
>         if (status_ == CREATE)
>         {
>             status_ = PROCESS;
>             std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this 
> << std::endl;
>             pingPongAsync_->RequestStartScan(&ctx_, &request_, 
> &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
>         }
>         else if (status_ == PROCESS)
>         {
>             std::cout << __FUNCTION__ << ":- " << "PROCESS this - " << 
> this << std::endl;
>             // we need to keep an instance of api call data in the queue 
> so that it keeps waiting in HandleRpcs.
>             new StartScanCallData(pingPongAsync_, cq_);
>             const Status status = StartScan(&ctx_, &request_, &reply_);
>             if (status.ok())
>             {
>                 status_ = FINISH;
>                 responder_.Finish(reply_, Status::OK, this);
>             }
>             else
>             {
>                 responder_.FinishWithError(status, this);
>             }
>         }
>         else
>         {
>             GPR_ASSERT(status_ == FINISH);
>             std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this 
> << std::endl;
>             delete this;
>         }
>     }
>
>     DummyScanner scanner_;
>     std::string Name() override
>     {
>         return "StartScanCallData";
>     }
>
> private:
>     grpc::Status StartScan(grpc::ServerContext* context, const 
> ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
>     {
>         std::cout << __FUNCTION__ << ":- " << 
> "\n\n----------------------------------------" << 
> std::this_thread::get_id() << std::endl;
>         // start dummy thread callback scan
>         scanner_.StartScan();
>         reply->set_scanalreadyinprogress(true);
>         return Status::OK;
>     }
>     ping_pong::ScanRequest request_;
>     ping_pong::ScanReply reply_;
>
>     ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
> };
>
> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
> {
> public:
>     RegisterScanProgressCallData(ping_pong::PingPong::AsyncService* 
> service, ServerCompletionQueue* cq)
>     {
>         Proceed();
>     }
>
>     void Proceed() override
>     {
>         std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << std::endl;
>         if (status_ == CREATE)
>         {
>             status_ = PROCESS;
>             std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << " CREATE this - " << this << std::endl;
>             pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_, 
> &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
>         }
>         else if (status_ == PROCESS)
>         {
>             std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
>             if (!callBackTriggered_)
>             {
>                 // we need to keep an instance of api call data in the 
> queue so that it keeps waiting in HandleRpcs.
>                 new RegisterScanProgressCallData(pingPongAsync_, cq_);
>                 const Status status = RegisterScanProgress();
>                 if (!status.ok())
>                 {
>                     status_ = FINISH;
>                     responder_.Finish(Status::OK, this);
>                     return;
>                 }
>                 else
>                 {
>                     scanStatus_ = ScanStatus::NOT_STARTED;
>                 }
>                 callBackTriggered_ = true;
>             }
>             else
>             {
>                 waitForScanEvent();
>             }
>             std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
>             ProgressReport();
>         }
>         else if (status_ == PUSH_TO_BACK)
>         {
>             status_ = PROCESS;
>             std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << " ********************************* Setting 
> alarm" << std::endl;
>             alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), 
> this);
>         }
>         else
>         {
>             GPR_ASSERT(status_ == FINISH);
>             std::cout << __FUNCTION__ << ":- " << " thread - " << 
> std::this_thread::get_id() << " FINISH this - " << this << std::endl;
>             delete this;
>         }
>     }
>
>     std::string Name() override
>     {
>         return "RegisterScanProgressCallData****";
>     }
>
> private:
>     grpc::Status RegisterScanProgress()
>     {
>         std::cout << __FUNCTION__ << ":- " << 
> "---------------------------------------- thread - " << 
> std::this_thread::get_id() << std::endl;
>         
> scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
>             RegisterScanProgressCallData::OnScanStarted,
>             RegisterScanProgressCallData::OnScanFinished,
>             this);
>         return Status::OK;
>     }
>
>     grpc::Status UnRegisterScanProgress()
>     {
>         return Status::OK;
>     }
>
>     void UpdateReply()
>     {
>         reply_.set_status(scanStatus_);
>         reply_.set_location(scanLocation_);
>     }
>
>     void ProgressReport()
>     {
>         UpdateReply();
>         std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " << 
> scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
>         if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
>             || (scanStatus_ == ScanStatus::FINISHED_FAILED))
>         {
>             status_ = FINISH;
>             std::cout << __FUNCTION__ << ":- " << "writing to responder_ 
> this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
>             responder_.Finish(Status::OK, this);
>         }
>         else
>         {
>             status_ = PUSH_TO_BACK;
>             std::cout << __FUNCTION__ << ":- " << "writing to responder_ 
> this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
>             responder_.Write(std::move(reply_), this);
>         }
>     }
>
>     static void OnScanProgress(void* context, const std::string location)
>     {
>         try
>         {
>             auto _this = 
> reinterpret_cast<RegisterScanProgressCallData*>(context);
>             std::cout << __FUNCTION__ << ":- " << "thread - " << 
> std::this_thread::get_id() << " this " << _this << std::endl;
>             _this->scanStatus_ = ScanStatus::IN_PROGRESS;
>             _this->scanLocation_.assign(location);
>             std::cout << __FUNCTION__ << ":- " << "triggering progress 
> event thread - " << std::this_thread::get_id() << " this " << _this << 
> std::endl;
>             _this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
>         }
>         catch (...)
>         {
>             // Avoid exception being thrown through DLL boundary.
>         }
>     }
>
>     static void OnScanStarted(void* context)
>     {
>         try
>         {
>             auto _this = 
> reinterpret_cast<RegisterScanProgressCallData*>(context);
>             std::cout << __FUNCTION__ << ":- " << "thread - " << 
> std::this_thread::get_id() << " this " << _this << std::endl;
>             _this->scanStatus_ = ScanStatus::STARTED;
>             std::cout << __FUNCTION__ << ":- " << "triggering started 
> event thread - " << std::this_thread::get_id() << " this " << _this << 
> std::endl;
>             _this->scanPromise_->set_value(ScanStatus::STARTED);
>         }
>         catch (...)
>         {
>             // Avoid exception being thrown through DLL boundary.
>         }
>     }
>
>     static void OnScanFinished(void* context, bool scanFailed)
>     {
>         try
>         {
>             auto _this = 
> reinterpret_cast<RegisterScanProgressCallData*>(context);
>             if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) && 
> (_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
>             {
>                 std::cout << __FUNCTION__ << ":- " << "thread - " << 
> std::this_thread::get_id() << " this " << _this << std::endl;
>                 _this->scanStatus_ = (scanFailed == true) ? 
> ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
>                 std::cout << __FUNCTION__ << ":- " << "triggering finished 
> event thread - " << std::this_thread::get_id() << " this " << _this << 
> std::endl;
>                 _this->scanPromise_->set_value(_this->scanStatus_);
>             }
>         }
>         catch (...)
>         {
>             // Avoid exception being thrown through DLL boundary.
>         }
>     }
>
>
>     void waitForScanEvent()
>     {
>         if (scanPromise_ != nullptr)
>         {
>             scanPromise_.release();
>         }
>         scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
>         auto scanFuture = scanPromise_->get_future();
>         std::cout << __FUNCTION__ << ":- " << "waiting for started event 
> with scanStatus_ - " << scanStatus_ << std::endl;
>         auto eventData = scanFuture.get();
>         scanStatus_ = eventData;
>         std::cout << __FUNCTION__ << ":- " << "got started event now 
> scanStatus_ - " << scanStatus_ << std::endl;
>     }
>
>     ping_pong::ScanProgressRequest request_;
>     ping_pong::ScanProgressReply reply_;
>     ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;
>
>     ping_pong::ScanStatus scanStatus_;
>     std::string scanLocation_;
>     bool callBackTriggered_ = false;
>     std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
> };
>
> class ServerImpl
> {
> public:
>     ~ServerImpl()
>     {
>         server_->Shutdown();
>         // Always shutdown the completion queue after the server.
>         cq_->Shutdown();
>     }
>
>     void Run()
>     {
>         rapidjson::Document napaPluginConfig;
>         napaPluginConfig.Parse(napa_plugin_config_json.c_str());
>         int port(napaPluginConfig["port"].GetInt());
>         std::string hostname(napaPluginConfig["hostname"].GetString());
>         std::string server_address(hostname + ":" + std::to_string(port));
>
>         // Check for Server Credentials
>         // Comes as a string, we want a boolean.
>         auto strGrpcUseInsecureCreds = 
> getenv("NAPA_GRPC_USE_INSECURE_CREDS");
>         // By default we will *not* use SSL for HTTP2.
>         // TODO(Keith): Once Envoy understands SSL, revert back to false.
>         bool bGrpcUseInsecureCreds = true;
>         if (strGrpcUseInsecureCreds != NULL)
>         {
>             std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha 
> >> bGrpcUseInsecureCreds;
>         }
>
>         std::cout
>             << "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
>             << bGrpcUseInsecureCreds
>             << std::endl;
>
>         grpc::EnableDefaultHealthCheckService(true);
>         grpc::reflection::InitProtoReflectionServerBuilderPlugin();
>         grpc::ServerBuilder builder;
>
>         std::shared_ptr<grpc::ServerCredentials> serverCredentials;
>
>         if (bGrpcUseInsecureCreds)
>         {
>             // Listen on the given address without any authentication 
> mechanism.
>             serverCredentials = grpc::InsecureServerCredentials();
>             std::cout
>                 << "NOTE: We are using gRPC insecure server credentials, 
> without SSL."
>                 << "This means you are not using HTTP2"
>                 << std::endl;
>         }
>         else
>         {
>             grpc::SslServerCredentialsOptions sslOptions;
>             sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
>             sslOptions.pem_key_cert_pairs.push_back({ 
> ssl_credentials_server_key.c_str(),
>                                                     
>  ssl_credentials_server_cert.c_str() });
>
>             serverCredentials = grpc::SslServerCredentials(sslOptions);
>             std::cout
>                 << "NOTE: We are using gRPC secure server credentials, 
> with SSL."
>                 << "This means you are using HTTP2"
>                 << std::endl;
>         }
>
>         builder.AddListeningPort(server_address, serverCredentials);
>
>         builder.RegisterService(&pingPongAsync_);
>
>         cq_ = builder.AddCompletionQueue();
>         cqProgress_ = builder.AddCompletionQueue();
>
>         // Use Keep-alive to stop initial slow calls
>         builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
>         builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
>         builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
>         builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>         
> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
>
>         // Finally assemble the server.
>         server_ = (builder.BuildAndStart());
>         std::cout
>             << "Server listening on "
>             << server_address
>             << std::endl;
>
>         // kick start the queues for async RPCs
>         HandleStartScan();
>         HandleRegisterScanProgress();
>         progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, 
> cq_.get());
>         progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, 
> cqProgress_.get());
>         // Wait for the server to shutdown. Note that some other thread 
> must be
>         // responsible for shutting down the server for this call to ever 
> return.
>         server_->Wait();
>     }
>
> private:
>     void HandleStartScan()
>     {
>         new StartScanCallData(&pingPongAsync_, cq_.get());
>     }
>
>     void HandleRegisterScanProgress()
>     {
>         new RegisterScanProgressCallData(&pingPongAsync_, 
> cqProgress_.get());
>     }
>
>     // TODO - we'll need to have more than one event loops. running in 
> thread for the RPC
>     void EventLoop(ServerCompletionQueue* event_queue)
>     {
>         void* tag; // uniquely identifies a request.
>         bool ok;
>
>         while (event_queue->Next(&tag, &ok))
>         {
>             IAsyncRpcDataAdapter* adapter = 
> static_cast<IAsyncRpcDataAdapter*>(tag);
>             if (ok)
>             {
>                 std::cout << __FUNCTION__ << ":- EventLoop event for  " << 
> " thread - " << std::this_thread::get_id() << " queue " << event_queue << " 
> tag " << adapter->Name() << std::endl;
>                 adapter->Proceed();
>             }
>             else
>             {
>                 std::cout << __FUNCTION__ << ":- Need to see why this 
> failed for " << " thread - " << std::this_thread::get_id() << " queue " << 
> event_queue << " tag " << adapter->Name() << std::endl;
>                 adapter->Proceed();
>                 continue;
>             }
>         }
>     }
>
>     std::unique_ptr<Server> server_;
>     std::unique_ptr<ServerCompletionQueue> cq_;
>     std::unique_ptr<ServerCompletionQueue> cqProgress_;
>     ping_pong::PingPong::AsyncService pingPongAsync_;
>     std::thread progressReportThread_;
>     std::thread progressOtherRpcs_;
> };
>
> int main(int argc, char** argv)
> {
>     ServerImpl server;
>     server.Run();
>     return 0;
> }
> // 
> -------------------------------------------------------------------------
> // proto file
> syntax = "proto3";
>
> package ping_pong;
>
> service PingPong {
>   rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
>   rpc StartScan (ScanRequest) returns (ScanReply) {}
>   rpc RegisterScanProgress (ScanProgressRequest) returns (stream 
> ScanProgressReply) {}
> }
>
> message PingPongRequest {
>     string input_msg = 1;
> }
>
> message PingPongReply {
>     string output_msg = 1;
> }
>
> message ScanRequest{
>   string scanType = 1;
> }
>
> message ScanReply{
>   bool scanAlreadyInProgress  = 1;
> }
>
> message ScanProgressRequest {
> }
>
> enum ScanStatus
> {
>   NOT_STARTED = 0;
>   STARTED     = 1;
>   IN_PROGRESS = 2;
>   FINISHED_SUCCESS  = 3;
>   FINISHED_FAILED   = 4;
> }
>
> message ScanProgressReply {
>     ScanStatus status = 1;
>     string location = 2;
> }
>
>
>
> On Tuesday, June 29, 2021 at 2:30:44 AM UTC+5:30 Vijay Pai wrote:
>
>> It's not really possible to say anything about this without knowing 
>> what's in "Proceed" .
>>
>> On Monday, June 21, 2021 at 9:03:15 PM UTC-7 [email protected] wrote:
>>
>>> We are facing an issue where Next of a completion queue returns ok as 
>>> false, if any other RPC is called.
>>> We need to implement a simple functionality of progress reporting of a 
>>> long running RPC. So we need two RPCs
>>> 1. StartScan - to trigger the long running scan - runs in separate 
>>> thread and its own completion queue
>>> 2. RegisterScanProgress  - to register for the callback that reports 
>>> status of the long running scan.  - runs in separate thread and its own 
>>> completion queue
>>>
>>> If we call  StartScan  first, and then cqProgress_(ServerCompletionQueue 
>>> for RegisterScanProgress) behaves as expected. though, if we call 
>>> RegisterScanProgress  first, then immediately after call to StartScan the 
>>> Next of cqProgress_  returns ok as false.
>>> Need help to understand this behavior. As after calling 
>>> RegisterScanProgress there could be call to any other RPC, and so we'd need 
>>> its queue to keep functioning for progress reporting.
>>> It'll be of great help if someone helps us understand what we are 
>>> missing.
>>>
>>> Proto looks like following
>>> service ScanService{
>>>     rpc StartScan (ScanRequest) returns (ScanReply) {}
>>>     rpc RegisterScanProgress (ScanProgressRequest) returns (stream 
>>> ScanProgressReply) {}
>>> }
>>> message ScanRequest{
>>>   string scanType = 1;
>>> }
>>> message ScanReply{
>>>   bool scanAlreadyInProgress  = 1;
>>> }
>>> message ScanProgressRequest {
>>> }
>>> enum ScanStatus
>>> {
>>>   NOT_STARTED = 0;
>>>   STARTED     = 1;
>>>   IN_PROGRESS = 2;
>>>   FINISHED_SUCCESS  = 3;
>>>   FINISHED_FAILED   = 4;
>>> }
>>> message ScanProgressReply {
>>>     ScanStatus status = 1;
>>>     string details = 2;
>>> }
>>>
>>> this is how the server code looks
>>>
>>> class ServerImpl
>>> {
>>> public:
>>>   ~ServerImpl()
>>>   {
>>>     server_->Shutdown();
>>>     // Always shutdown the completion queue after the server.
>>>     cq_->Shutdown();
>>>   }
>>>
>>>   void Run()
>>>   {
>>>     //... other server config code
>>>     grpc::EnableDefaultHealthCheckService(true);
>>>     grpc::reflection::InitProtoReflectionServerBuilderPlugin();
>>>     grpc::ServerBuilder builder;
>>>
>>>     builder.AddListeningPort(server_address, serverCredentials);
>>>
>>>     cq_ = builder.AddCompletionQueue();
>>>     cqProgress_ = builder.AddCompletionQueue();
>>>
>>>     // Use Keep-alive to stop initial slow calls
>>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
>>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
>>>     builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
>>>     builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 
>>> 1);
>>>
>>>     // Finally assemble the server.
>>>     server_ = (builder.BuildAndStart());
>>>
>>>     // kick start the queues for async RPCs
>>>     HandleStartScan(cq_.get());
>>>     HandleRegisterScanProgress(cqProgress_.get());
>>>     progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, 
>>> cq_.get());
>>>     progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, 
>>> cqProgress_.get());
>>>     server_->Wait();
>>>   }
>>>
>>> private:
>>>   void HandleStartScan(ServerCompletionQueue* event_queue)
>>>   {
>>>     new StartScanCallData(&scanAsync_, event_queue);
>>>   }
>>>   void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
>>>   {
>>>     new RegisterScanProgressCallData(&scanAsync_, event_queue);
>>>   }
>>>   void EventLoop(ServerCompletionQueue* event_queue)
>>>   {
>>>     void* tag; // uniquely identifies a request.
>>>     bool ok;
>>>     while (event_queue->Next(&tag, &ok))
>>>     {
>>>       IAsyncRpcDataAdapter* adapter = 
>>> static_cast<IAsyncRpcDataAdapter*>(tag);
>>>       if (ok)
>>>       {
>>>         adapter->Proceed();
>>>       }
>>>       else
>>>       {
>>>         std::cout << "OK is false" << std::endl;
>>>         continue;
>>>       }
>>>     }
>>>   }
>>>
>>>   std::unique_ptr<ServerCompletionQueue> cq_;
>>>   std::unique_ptr<ServerCompletionQueue> cqProgress_;
>>>   napa::Nvbackend::AsyncService scanAsync_;
>>>   std::unique_ptr<Server> server_;
>>>   std::thread progressReportThread_;
>>>   std::thread progressOtherRpcs_;
>>> };
>>>
>>> int main(int argc, char** argv)
>>> {
>>>   ServerImpl server;
>>>   server.Run();
>>>   return 0;
>>> }
>>>
>>> and this is how the service code looks
>>>
>>> enum CallStatus
>>> {
>>>   CREATE,
>>>   PROCESS,
>>>   FINISH,
>>>   PUSH_TO_BACK
>>> };
>>>
>>> // interface for service to handle the async RPC
>>> // every RPC will need to implement this interface, so that GRPC server 
>>> can
>>> // call RPC without knowing its type.
>>> class IAsyncRpcDataAdapter
>>> {
>>> public:
>>>   virtual void Proceed() = 0;
>>>   virtual ~IAsyncRpcDataAdapter() = default;
>>>
>>>   protected:
>>>     IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
>>>       : scanAsync_(service)
>>>       , cq_(cq)
>>>       , status_(CREATE)
>>>     {
>>>     }
>>>     ScanService* scanAsync_;
>>>     ServerCompletionQueue* cq_;
>>>     ServerContext ctx_;
>>>     CallStatus status_; // The current serving state.
>>>     grpc::Alarm alarm_;
>>> };
>>>
>>> class StartScanCallData : public IAsyncRpcDataAdapter
>>> {
>>> public:
>>>   StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
>>>   void Proceed() override;
>>>
>>> private:
>>>   ScanRequest request_;
>>>   ScanReply reply_;
>>>
>>>   ServerAsyncResponseWriter<ScanReply> responder_;
>>>   grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest 
>>> *request, ScanReply *reply);
>>> };
>>>
>>> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
>>> {
>>> public:
>>>   RegisterScanProgressCallData(ScanService* service, 
>>> ServerCompletionQueue* cq);
>>>   void Proceed() override;
>>>
>>> private:
>>>   ScanProgressRequest request_;
>>>   ScanProgressReply reply_;
>>>   ServerAsyncWriter<ScanProgressReply> responder_;
>>>
>>>   void ProgressReport();
>>>   void waitForScanEvent();
>>>   std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
>>> };
>>>
>>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/ebb4a401-0763-41a7-bc29-6249486974c8n%40googlegroups.com.

Reply via email to