here is the complete code:
#include <stdlib.h>

#pragma region gRPC
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#pragma endregion gRPC

#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <iomanip>
#include <thread>

#pragma region Generated Files
#include "napa_plugins_config.hpp"
#include "napa_plugins_core_ssl.hpp"
#pragma endregion Generated Files

#pragma region
#include "rapidjson/document.h"
#pragma endregion

#include <future>

#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/alarm.h>

#include "protos/PingPong.grpc.pb.h"
#include "protos/PingPong.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerCompletionQueue;
using grpc::ServerAsyncResponseWriter;


class PingPongServiceImpl final
    : public ping_pong::PingPong::Service {
public:
    ~PingPongServiceImpl() {
        std::cout << "Destructor of PingPongServiceImpl" << std::endl;
    }
private:
    grpc::Status PingUnary(
        grpc::ServerContext* context,
        const ping_pong::PingPongRequest* request,
        ping_pong::PingPongReply* reply) override
    {
        std::cout << "PingPong" << std::endl;

        std::cout
            << "PingPong - input_msg = "
            << request->input_msg()
            << std::endl;

        if (request->input_msg() == "hello") {
            reply->set_output_msg("world");
        }
        else {
            reply->set_output_msg("I can't pong unless you ping me 
'hello'!");
        }

        std::cout << "Replying with " << reply->output_msg() << std::endl;

        return grpc::Status::OK;
    }

};

typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_ 
const std::string location);
typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool 
scanFailed);

class DummyScanner
{
public:
    DummyScanner()
        :scanContext_(NULL),
        scanCount_(0),
        scanProgressCallback_(NULL),
        scanStartedCallback_(NULL),
        scanFinishedCallback_(NULL)
    {
    }
    ~DummyScanner()
    {
        if (dummyScanThread_.joinable())
        {
            dummyScanThread_.join();
        }
    }
    void RegisterScanProgressCallbacks(
        ScanProgressCallbackType* onScanProgress,
        ScanStartedCallbackType* onScanStarted,
        ScanFinishedCallbackType* onScanFinished,
        _In_opt_ void* context)
    {
        scanProgressCallback_ = onScanProgress;
        scanStartedCallback_ = onScanStarted;
        scanFinishedCallback_ = onScanFinished;
        scanContext_ = context;
    }

    bool StartScan()
    {
        if (scanCount_ > 0)
        {
            return false;
        }
        if (dummyScanThread_.joinable())
        {
            dummyScanThread_.join();
        }

        dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport, 
this);
        return true;
    }

private:
    void dummyScanAndReport()
    {
        while (true)
        {
            if (scanCount_ == 100)
            {
                if (scanFinishedCallback_ != NULL)
                {
                    scanFinishedCallback_(scanContext_, false);
                }
                break;
            }
            if (scanCount_ == 1)
            {
                if (scanStartedCallback_ != NULL)
                {
                    scanStartedCallback_(scanContext_);
                }
            }
            if (scanCount_ < 100)
            {
                if (scanProgressCallback_ != NULL)
                {
                    std::stringstream ss;
                    ss << scanCount_;
                    scanProgressCallback_(scanContext_, ss.str());
                }
            }
            scanCount_++;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        scanCount_ = 0;
    }


private:
    void* scanContext_;
    int scanCount_;
    ScanProgressCallbackType* scanProgressCallback_;
    ScanStartedCallbackType* scanStartedCallback_;
    ScanFinishedCallbackType* scanFinishedCallback_;
    std::thread dummyScanThread_;
};

enum CallStatus
{
    CREATE,
    PROCESS,
    FINISH,
    PUSH_TO_BACK
};

// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
    virtual void Proceed() = 0;
    virtual std::string Name() = 0;
    virtual ~IAsyncRpcDataAdapter() = default;

protected:
    IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service, 
ServerCompletionQueue* cq)
        : pingPongAsync_(service)
        , cq_(cq)
        , status_(CREATE)
    {
    }
    ping_pong::PingPong::AsyncService* pingPongAsync_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;
    CallStatus status_; // The current serving state.
    grpc::Alarm alarm_;
};

class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
    StartScanCallData(ping_pong::PingPong::AsyncService* service, 
ServerCompletionQueue* cq)
    {
        Proceed();
    }

    void Proceed() override
    {
        std::cout << __FUNCTION__ << std::endl;
        if (status_ == CREATE)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this 
<< std::endl;
            pingPongAsync_->RequestStartScan(&ctx_, &request_, &responder_, 
cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
        }
        else if (status_ == PROCESS)
        {
            std::cout << __FUNCTION__ << ":- " << "PROCESS this - " << this 
<< std::endl;
            // we need to keep an instance of api call data in the queue so 
that it keeps waiting in HandleRpcs.
            new StartScanCallData(pingPongAsync_, cq_);
            const Status status = StartScan(&ctx_, &request_, &reply_);
            if (status.ok())
            {
                status_ = FINISH;
                responder_.Finish(reply_, Status::OK, this);
            }
            else
            {
                responder_.FinishWithError(status, this);
            }
        }
        else
        {
            GPR_ASSERT(status_ == FINISH);
            std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this 
<< std::endl;
            delete this;
        }
    }

    DummyScanner scanner_;
    std::string Name() override
    {
        return "StartScanCallData";
    }

private:
    grpc::Status StartScan(grpc::ServerContext* context, const 
ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
    {
        std::cout << __FUNCTION__ << ":- " << 
"\n\n----------------------------------------" << 
std::this_thread::get_id() << std::endl;
        // start dummy thread callback scan
        scanner_.StartScan();
        reply->set_scanalreadyinprogress(true);
        return Status::OK;
    }
    ping_pong::ScanRequest request_;
    ping_pong::ScanReply reply_;

    ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
};

class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
    RegisterScanProgressCallData(ping_pong::PingPong::AsyncService* 
service, ServerCompletionQueue* cq)
    {
        Proceed();
    }

    void Proceed() override
    {
        std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << std::endl;
        if (status_ == CREATE)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << " CREATE this - " << this << std::endl;
            pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_, 
&responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
        }
        else if (status_ == PROCESS)
        {
            std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
            if (!callBackTriggered_)
            {
                // we need to keep an instance of api call data in the 
queue so that it keeps waiting in HandleRpcs.
                new RegisterScanProgressCallData(pingPongAsync_, cq_);
                const Status status = RegisterScanProgress();
                if (!status.ok())
                {
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                    return;
                }
                else
                {
                    scanStatus_ = ScanStatus::NOT_STARTED;
                }
                callBackTriggered_ = true;
            }
            else
            {
                waitForScanEvent();
            }
            std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
            ProgressReport();
        }
        else if (status_ == PUSH_TO_BACK)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << " ********************************* Setting 
alarm" << std::endl;
            alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), 
this);
        }
        else
        {
            GPR_ASSERT(status_ == FINISH);
            std::cout << __FUNCTION__ << ":- " << " thread - " << 
std::this_thread::get_id() << " FINISH this - " << this << std::endl;
            delete this;
        }
    }

    std::string Name() override
    {
        return "RegisterScanProgressCallData****";
    }

private:
    grpc::Status RegisterScanProgress()
    {
        std::cout << __FUNCTION__ << ":- " << 
"---------------------------------------- thread - " << 
std::this_thread::get_id() << std::endl;
        
scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
            RegisterScanProgressCallData::OnScanStarted,
            RegisterScanProgressCallData::OnScanFinished,
            this);
        return Status::OK;
    }

    grpc::Status UnRegisterScanProgress()
    {
        return Status::OK;
    }

    void UpdateReply()
    {
        reply_.set_status(scanStatus_);
        reply_.set_location(scanLocation_);
    }

    void ProgressReport()
    {
        UpdateReply();
        std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " << 
scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
        if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
            || (scanStatus_ == ScanStatus::FINISHED_FAILED))
        {
            status_ = FINISH;
            std::cout << __FUNCTION__ << ":- " << "writing to responder_ 
this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
            responder_.Finish(Status::OK, this);
        }
        else
        {
            status_ = PUSH_TO_BACK;
            std::cout << __FUNCTION__ << ":- " << "writing to responder_ 
this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
            responder_.Write(std::move(reply_), this);
        }
    }

    static void OnScanProgress(void* context, const std::string location)
    {
        try
        {
            auto _this = 
reinterpret_cast<RegisterScanProgressCallData*>(context);
            std::cout << __FUNCTION__ << ":- " << "thread - " << 
std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanStatus_ = ScanStatus::IN_PROGRESS;
            _this->scanLocation_.assign(location);
            std::cout << __FUNCTION__ << ":- " << "triggering progress 
event thread - " << std::this_thread::get_id() << " this " << _this << 
std::endl;
            _this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }

    static void OnScanStarted(void* context)
    {
        try
        {
            auto _this = 
reinterpret_cast<RegisterScanProgressCallData*>(context);
            std::cout << __FUNCTION__ << ":- " << "thread - " << 
std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanStatus_ = ScanStatus::STARTED;
            std::cout << __FUNCTION__ << ":- " << "triggering started event 
thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanPromise_->set_value(ScanStatus::STARTED);
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }

    static void OnScanFinished(void* context, bool scanFailed)
    {
        try
        {
            auto _this = 
reinterpret_cast<RegisterScanProgressCallData*>(context);
            if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) && 
(_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
            {
                std::cout << __FUNCTION__ << ":- " << "thread - " << 
std::this_thread::get_id() << " this " << _this << std::endl;
                _this->scanStatus_ = (scanFailed == true) ? 
ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
                std::cout << __FUNCTION__ << ":- " << "triggering finished 
event thread - " << std::this_thread::get_id() << " this " << _this << 
std::endl;
                _this->scanPromise_->set_value(_this->scanStatus_);
            }
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }


    void waitForScanEvent()
    {
        if (scanPromise_ != nullptr)
        {
            scanPromise_.release();
        }
        scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
        auto scanFuture = scanPromise_->get_future();
        std::cout << __FUNCTION__ << ":- " << "waiting for started event 
with scanStatus_ - " << scanStatus_ << std::endl;
        auto eventData = scanFuture.get();
        scanStatus_ = eventData;
        std::cout << __FUNCTION__ << ":- " << "got started event now 
scanStatus_ - " << scanStatus_ << std::endl;
    }

    ping_pong::ScanProgressRequest request_;
    ping_pong::ScanProgressReply reply_;
    ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;

    ping_pong::ScanStatus scanStatus_;
    std::string scanLocation_;
    bool callBackTriggered_ = false;
    std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
};

class ServerImpl
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        // Always shutdown the completion queue after the server.
        cq_->Shutdown();
    }

    void Run()
    {
        rapidjson::Document napaPluginConfig;
        napaPluginConfig.Parse(napa_plugin_config_json.c_str());
        int port(napaPluginConfig["port"].GetInt());
        std::string hostname(napaPluginConfig["hostname"].GetString());
        std::string server_address(hostname + ":" + std::to_string(port));

        // Check for Server Credentials
        // Comes as a string, we want a boolean.
        auto strGrpcUseInsecureCreds = 
getenv("NAPA_GRPC_USE_INSECURE_CREDS");
        // By default we will *not* use SSL for HTTP2.
        // TODO(Keith): Once Envoy understands SSL, revert back to false.
        bool bGrpcUseInsecureCreds = true;
        if (strGrpcUseInsecureCreds != NULL)
        {
            std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha 
>> bGrpcUseInsecureCreds;
        }

        std::cout
            << "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
            << bGrpcUseInsecureCreds
            << std::endl;

        grpc::EnableDefaultHealthCheckService(true);
        grpc::reflection::InitProtoReflectionServerBuilderPlugin();
        grpc::ServerBuilder builder;

        std::shared_ptr<grpc::ServerCredentials> serverCredentials;

        if (bGrpcUseInsecureCreds)
        {
            // Listen on the given address without any authentication 
mechanism.
            serverCredentials = grpc::InsecureServerCredentials();
            std::cout
                << "NOTE: We are using gRPC insecure server credentials, 
without SSL."
                << "This means you are not using HTTP2"
                << std::endl;
        }
        else
        {
            grpc::SslServerCredentialsOptions sslOptions;
            sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
            sslOptions.pem_key_cert_pairs.push_back({ 
ssl_credentials_server_key.c_str(),
                                                    
 ssl_credentials_server_cert.c_str() });

            serverCredentials = grpc::SslServerCredentials(sslOptions);
            std::cout
                << "NOTE: We are using gRPC secure server credentials, with 
SSL."
                << "This means you are using HTTP2"
                << std::endl;
        }

        builder.AddListeningPort(server_address, serverCredentials);

        builder.RegisterService(&pingPongAsync_);

        cq_ = builder.AddCompletionQueue();
        cqProgress_ = builder.AddCompletionQueue();

        // Use Keep-alive to stop initial slow calls
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 
1);

        // Finally assemble the server.
        server_ = (builder.BuildAndStart());
        std::cout
            << "Server listening on "
            << server_address
            << std::endl;

        // kick start the queues for async RPCs
        HandleStartScan();
        HandleRegisterScanProgress();
        progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, 
cq_.get());
        progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, 
cqProgress_.get());
        // Wait for the server to shutdown. Note that some other thread 
must be
        // responsible for shutting down the server for this call to ever 
return.
        server_->Wait();
    }

private:
    void HandleStartScan()
    {
        new StartScanCallData(&pingPongAsync_, cq_.get());
    }

    void HandleRegisterScanProgress()
    {
        new RegisterScanProgressCallData(&pingPongAsync_, 
cqProgress_.get());
    }

    // TODO - we'll need to have more than one event loops. running in 
thread for the RPC
    void EventLoop(ServerCompletionQueue* event_queue)
    {
        void* tag; // uniquely identifies a request.
        bool ok;

        while (event_queue->Next(&tag, &ok))
        {
            IAsyncRpcDataAdapter* adapter = 
static_cast<IAsyncRpcDataAdapter*>(tag);
            if (ok)
            {
                std::cout << __FUNCTION__ << ":- EventLoop event for  " << 
" thread - " << std::this_thread::get_id() << " queue " << event_queue << " 
tag " << adapter->Name() << std::endl;
                adapter->Proceed();
            }
            else
            {
                std::cout << __FUNCTION__ << ":- Need to see why this 
failed for " << " thread - " << std::this_thread::get_id() << " queue " << 
event_queue << " tag " << adapter->Name() << std::endl;
                adapter->Proceed();
                continue;
            }
        }
    }

    std::unique_ptr<Server> server_;
    std::unique_ptr<ServerCompletionQueue> cq_;
    std::unique_ptr<ServerCompletionQueue> cqProgress_;
    ping_pong::PingPong::AsyncService pingPongAsync_;
    std::thread progressReportThread_;
    std::thread progressOtherRpcs_;
};

int main(int argc, char** argv)
{
    ServerImpl server;
    server.Run();
    return 0;
}
// -------------------------------------------------------------------------
// proto file
syntax = "proto3";

package ping_pong;

service PingPong {
  rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
  rpc StartScan (ScanRequest) returns (ScanReply) {}
  rpc RegisterScanProgress (ScanProgressRequest) returns (stream 
ScanProgressReply) {}
}

message PingPongRequest {
    string input_msg = 1;
}

message PingPongReply {
    string output_msg = 1;
}

message ScanRequest{
  string scanType = 1;
}

message ScanReply{
  bool scanAlreadyInProgress  = 1;
}

message ScanProgressRequest {
}

enum ScanStatus
{
  NOT_STARTED = 0;
  STARTED     = 1;
  IN_PROGRESS = 2;
  FINISHED_SUCCESS  = 3;
  FINISHED_FAILED   = 4;
}

message ScanProgressReply {
    ScanStatus status = 1;
    string location = 2;
}



On Tuesday, June 29, 2021 at 2:30:44 AM UTC+5:30 Vijay Pai wrote:

> It's not really possible to say anything about this without knowing what's 
> in "Proceed" .
>
> On Monday, June 21, 2021 at 9:03:15 PM UTC-7 [email protected] wrote:
>
>> We are facing an issue where Next of a completion queue returns ok as 
>> false, if any other RPC is called.
>> We need to implement a simple functionality of progress reporting of a 
>> long running RPC. So we need two RPCs
>> 1. StartScan - to trigger the long running scan - runs in separate thread 
>> and its own completion queue
>> 2. RegisterScanProgress  - to register for the callback that reports 
>> status of the long running scan.  - runs in separate thread and its own 
>> completion queue
>>
>> If we call  StartScan  first, and then cqProgress_(ServerCompletionQueue 
>> for RegisterScanProgress) behaves as expected. though, if we call 
>> RegisterScanProgress  first, then immediately after call to StartScan the 
>> Next of cqProgress_  returns ok as false.
>> Need help to understand this behavior. As after calling 
>> RegisterScanProgress there could be call to any other RPC, and so we'd need 
>> its queue to keep functioning for progress reporting.
>> It'll be of great help if someone helps us understand what we are missing.
>>
>> Proto looks like following
>> service ScanService{
>>     rpc StartScan (ScanRequest) returns (ScanReply) {}
>>     rpc RegisterScanProgress (ScanProgressRequest) returns (stream 
>> ScanProgressReply) {}
>> }
>> message ScanRequest{
>>   string scanType = 1;
>> }
>> message ScanReply{
>>   bool scanAlreadyInProgress  = 1;
>> }
>> message ScanProgressRequest {
>> }
>> enum ScanStatus
>> {
>>   NOT_STARTED = 0;
>>   STARTED     = 1;
>>   IN_PROGRESS = 2;
>>   FINISHED_SUCCESS  = 3;
>>   FINISHED_FAILED   = 4;
>> }
>> message ScanProgressReply {
>>     ScanStatus status = 1;
>>     string details = 2;
>> }
>>
>> this is how the server code looks
>>
>> class ServerImpl
>> {
>> public:
>>   ~ServerImpl()
>>   {
>>     server_->Shutdown();
>>     // Always shutdown the completion queue after the server.
>>     cq_->Shutdown();
>>   }
>>
>>   void Run()
>>   {
>>     //... other server config code
>>     grpc::EnableDefaultHealthCheckService(true);
>>     grpc::reflection::InitProtoReflectionServerBuilderPlugin();
>>     grpc::ServerBuilder builder;
>>
>>     builder.AddListeningPort(server_address, serverCredentials);
>>
>>     cq_ = builder.AddCompletionQueue();
>>     cqProgress_ = builder.AddCompletionQueue();
>>
>>     // Use Keep-alive to stop initial slow calls
>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
>>     builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
>>     builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>>     builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 
>> 1);
>>
>>     // Finally assemble the server.
>>     server_ = (builder.BuildAndStart());
>>
>>     // kick start the queues for async RPCs
>>     HandleStartScan(cq_.get());
>>     HandleRegisterScanProgress(cqProgress_.get());
>>     progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, 
>> cq_.get());
>>     progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, 
>> cqProgress_.get());
>>     server_->Wait();
>>   }
>>
>> private:
>>   void HandleStartScan(ServerCompletionQueue* event_queue)
>>   {
>>     new StartScanCallData(&scanAsync_, event_queue);
>>   }
>>   void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
>>   {
>>     new RegisterScanProgressCallData(&scanAsync_, event_queue);
>>   }
>>   void EventLoop(ServerCompletionQueue* event_queue)
>>   {
>>     void* tag; // uniquely identifies a request.
>>     bool ok;
>>     while (event_queue->Next(&tag, &ok))
>>     {
>>       IAsyncRpcDataAdapter* adapter = 
>> static_cast<IAsyncRpcDataAdapter*>(tag);
>>       if (ok)
>>       {
>>         adapter->Proceed();
>>       }
>>       else
>>       {
>>         std::cout << "OK is false" << std::endl;
>>         continue;
>>       }
>>     }
>>   }
>>
>>   std::unique_ptr<ServerCompletionQueue> cq_;
>>   std::unique_ptr<ServerCompletionQueue> cqProgress_;
>>   napa::Nvbackend::AsyncService scanAsync_;
>>   std::unique_ptr<Server> server_;
>>   std::thread progressReportThread_;
>>   std::thread progressOtherRpcs_;
>> };
>>
>> int main(int argc, char** argv)
>> {
>>   ServerImpl server;
>>   server.Run();
>>   return 0;
>> }
>>
>> and this is how the service code looks
>>
>> enum CallStatus
>> {
>>   CREATE,
>>   PROCESS,
>>   FINISH,
>>   PUSH_TO_BACK
>> };
>>
>> // interface for service to handle the async RPC
>> // every RPC will need to implement this interface, so that GRPC server 
>> can
>> // call RPC without knowing its type.
>> class IAsyncRpcDataAdapter
>> {
>> public:
>>   virtual void Proceed() = 0;
>>   virtual ~IAsyncRpcDataAdapter() = default;
>>
>>   protected:
>>     IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
>>       : scanAsync_(service)
>>       , cq_(cq)
>>       , status_(CREATE)
>>     {
>>     }
>>     ScanService* scanAsync_;
>>     ServerCompletionQueue* cq_;
>>     ServerContext ctx_;
>>     CallStatus status_; // The current serving state.
>>     grpc::Alarm alarm_;
>> };
>>
>> class StartScanCallData : public IAsyncRpcDataAdapter
>> {
>> public:
>>   StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
>>   void Proceed() override;
>>
>> private:
>>   ScanRequest request_;
>>   ScanReply reply_;
>>
>>   ServerAsyncResponseWriter<ScanReply> responder_;
>>   grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest 
>> *request, ScanReply *reply);
>> };
>>
>> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
>> {
>> public:
>>   RegisterScanProgressCallData(ScanService* service, 
>> ServerCompletionQueue* cq);
>>   void Proceed() override;
>>
>> private:
>>   ScanProgressRequest request_;
>>   ScanProgressReply reply_;
>>   ServerAsyncWriter<ScanProgressReply> responder_;
>>
>>   void ProgressReport();
>>   void waitForScanEvent();
>>   std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
>> };
>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/57d8322f-09f9-4fa0-8501-2a257bc3f0d2n%40googlegroups.com.

Reply via email to