FYI - not sure if it'd matter, though, we are testing this implementation
using Kreya <https://kreya.app/>tool & there is envoy proxy between client
and GRPC
On Tuesday, June 29, 2021 at 9:00:30 AM UTC+5:30 Avinash Jagtap IN wrote:
> here is the complete code:
> #include <stdlib.h>
>
> #pragma region gRPC
> #include <grpcpp/grpcpp.h>
> #include <grpcpp/health_check_service_interface.h>
> #include <grpcpp/ext/proto_server_reflection_plugin.h>
> #pragma endregion gRPC
>
> #include <iostream>
> #include <memory>
> #include <string>
> #include <sstream>
> #include <iomanip>
> #include <thread>
>
> #pragma region Generated Files
> #include "napa_plugins_config.hpp"
> #include "napa_plugins_core_ssl.hpp"
> #pragma endregion Generated Files
>
> #pragma region
> #include "rapidjson/document.h"
> #pragma endregion
>
> #include <future>
>
> #include <grpcpp/impl/codegen/service_type.h>
> #include <grpcpp/alarm.h>
>
> #include "protos/PingPong.grpc.pb.h"
> #include "protos/PingPong.pb.h"
>
> using grpc::Server;
> using grpc::ServerBuilder;
> using grpc::ServerContext;
> using grpc::Status;
> using grpc::ServerCompletionQueue;
> using grpc::ServerAsyncResponseWriter;
>
>
> class PingPongServiceImpl final
> : public ping_pong::PingPong::Service {
> public:
> ~PingPongServiceImpl() {
> std::cout << "Destructor of PingPongServiceImpl" << std::endl;
> }
> private:
> grpc::Status PingUnary(
> grpc::ServerContext* context,
> const ping_pong::PingPongRequest* request,
> ping_pong::PingPongReply* reply) override
> {
> std::cout << "PingPong" << std::endl;
>
> std::cout
> << "PingPong - input_msg = "
> << request->input_msg()
> << std::endl;
>
> if (request->input_msg() == "hello") {
> reply->set_output_msg("world");
> }
> else {
> reply->set_output_msg("I can't pong unless you ping me
> 'hello'!");
> }
>
> std::cout << "Replying with " << reply->output_msg() << std::endl;
>
> return grpc::Status::OK;
> }
>
> };
>
> typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_
> const std::string location);
> typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
> typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool
> scanFailed);
>
> class DummyScanner
> {
> public:
> DummyScanner()
> :scanContext_(NULL),
> scanCount_(0),
> scanProgressCallback_(NULL),
> scanStartedCallback_(NULL),
> scanFinishedCallback_(NULL)
> {
> }
> ~DummyScanner()
> {
> if (dummyScanThread_.joinable())
> {
> dummyScanThread_.join();
> }
> }
> void RegisterScanProgressCallbacks(
> ScanProgressCallbackType* onScanProgress,
> ScanStartedCallbackType* onScanStarted,
> ScanFinishedCallbackType* onScanFinished,
> _In_opt_ void* context)
> {
> scanProgressCallback_ = onScanProgress;
> scanStartedCallback_ = onScanStarted;
> scanFinishedCallback_ = onScanFinished;
> scanContext_ = context;
> }
>
> bool StartScan()
> {
> if (scanCount_ > 0)
> {
> return false;
> }
> if (dummyScanThread_.joinable())
> {
> dummyScanThread_.join();
> }
>
> dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport,
> this);
> return true;
> }
>
> private:
> void dummyScanAndReport()
> {
> while (true)
> {
> if (scanCount_ == 100)
> {
> if (scanFinishedCallback_ != NULL)
> {
> scanFinishedCallback_(scanContext_, false);
> }
> break;
> }
> if (scanCount_ == 1)
> {
> if (scanStartedCallback_ != NULL)
> {
> scanStartedCallback_(scanContext_);
> }
> }
> if (scanCount_ < 100)
> {
> if (scanProgressCallback_ != NULL)
> {
> std::stringstream ss;
> ss << scanCount_;
> scanProgressCallback_(scanContext_, ss.str());
> }
> }
> scanCount_++;
> std::this_thread::sleep_for(std::chrono::milliseconds(100));
> }
> scanCount_ = 0;
> }
>
>
> private:
> void* scanContext_;
> int scanCount_;
> ScanProgressCallbackType* scanProgressCallback_;
> ScanStartedCallbackType* scanStartedCallback_;
> ScanFinishedCallbackType* scanFinishedCallback_;
> std::thread dummyScanThread_;
> };
>
> enum CallStatus
> {
> CREATE,
> PROCESS,
> FINISH,
> PUSH_TO_BACK
> };
>
> // interface for service to handle the async RPC
> // every RPC will need to implement this interface, so that GRPC server can
> // call RPC without knowing its type.
> class IAsyncRpcDataAdapter
> {
> public:
> virtual void Proceed() = 0;
> virtual std::string Name() = 0;
> virtual ~IAsyncRpcDataAdapter() = default;
>
> protected:
> IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service,
> ServerCompletionQueue* cq)
> : pingPongAsync_(service)
> , cq_(cq)
> , status_(CREATE)
> {
> }
> ping_pong::PingPong::AsyncService* pingPongAsync_;
> ServerCompletionQueue* cq_;
> ServerContext ctx_;
> CallStatus status_; // The current serving state.
> grpc::Alarm alarm_;
> };
>
> class StartScanCallData : public IAsyncRpcDataAdapter
> {
> public:
> StartScanCallData(ping_pong::PingPong::AsyncService* service,
> ServerCompletionQueue* cq)
> {
> Proceed();
> }
>
> void Proceed() override
> {
> std::cout << __FUNCTION__ << std::endl;
> if (status_ == CREATE)
> {
> status_ = PROCESS;
> std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this
> << std::endl;
> pingPongAsync_->RequestStartScan(&ctx_, &request_,
> &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
> }
> else if (status_ == PROCESS)
> {
> std::cout << __FUNCTION__ << ":- " << "PROCESS this - " <<
> this << std::endl;
> // we need to keep an instance of api call data in the queue
> so that it keeps waiting in HandleRpcs.
> new StartScanCallData(pingPongAsync_, cq_);
> const Status status = StartScan(&ctx_, &request_, &reply_);
> if (status.ok())
> {
> status_ = FINISH;
> responder_.Finish(reply_, Status::OK, this);
> }
> else
> {
> responder_.FinishWithError(status, this);
> }
> }
> else
> {
> GPR_ASSERT(status_ == FINISH);
> std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this
> << std::endl;
> delete this;
> }
> }
>
> DummyScanner scanner_;
> std::string Name() override
> {
> return "StartScanCallData";
> }
>
> private:
> grpc::Status StartScan(grpc::ServerContext* context, const
> ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
> {
> std::cout << __FUNCTION__ << ":- " <<
> "\n\n----------------------------------------" <<
> std::this_thread::get_id() << std::endl;
> // start dummy thread callback scan
> scanner_.StartScan();
> reply->set_scanalreadyinprogress(true);
> return Status::OK;
> }
> ping_pong::ScanRequest request_;
> ping_pong::ScanReply reply_;
>
> ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
> };
>
> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
> {
> public:
> RegisterScanProgressCallData(ping_pong::PingPong::AsyncService*
> service, ServerCompletionQueue* cq)
> {
> Proceed();
> }
>
> void Proceed() override
> {
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << std::endl;
> if (status_ == CREATE)
> {
> status_ = PROCESS;
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << " CREATE this - " << this << std::endl;
> pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_,
> &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
> }
> else if (status_ == PROCESS)
> {
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
> if (!callBackTriggered_)
> {
> // we need to keep an instance of api call data in the
> queue so that it keeps waiting in HandleRpcs.
> new RegisterScanProgressCallData(pingPongAsync_, cq_);
> const Status status = RegisterScanProgress();
> if (!status.ok())
> {
> status_ = FINISH;
> responder_.Finish(Status::OK, this);
> return;
> }
> else
> {
> scanStatus_ = ScanStatus::NOT_STARTED;
> }
> callBackTriggered_ = true;
> }
> else
> {
> waitForScanEvent();
> }
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
> ProgressReport();
> }
> else if (status_ == PUSH_TO_BACK)
> {
> status_ = PROCESS;
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << " ********************************* Setting
> alarm" << std::endl;
> alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME),
> this);
> }
> else
> {
> GPR_ASSERT(status_ == FINISH);
> std::cout << __FUNCTION__ << ":- " << " thread - " <<
> std::this_thread::get_id() << " FINISH this - " << this << std::endl;
> delete this;
> }
> }
>
> std::string Name() override
> {
> return "RegisterScanProgressCallData****";
> }
>
> private:
> grpc::Status RegisterScanProgress()
> {
> std::cout << __FUNCTION__ << ":- " <<
> "---------------------------------------- thread - " <<
> std::this_thread::get_id() << std::endl;
>
> scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
> RegisterScanProgressCallData::OnScanStarted,
> RegisterScanProgressCallData::OnScanFinished,
> this);
> return Status::OK;
> }
>
> grpc::Status UnRegisterScanProgress()
> {
> return Status::OK;
> }
>
> void UpdateReply()
> {
> reply_.set_status(scanStatus_);
> reply_.set_location(scanLocation_);
> }
>
> void ProgressReport()
> {
> UpdateReply();
> std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " <<
> scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
> if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
> || (scanStatus_ == ScanStatus::FINISHED_FAILED))
> {
> status_ = FINISH;
> std::cout << __FUNCTION__ << ":- " << "writing to responder_
> this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
> responder_.Finish(Status::OK, this);
> }
> else
> {
> status_ = PUSH_TO_BACK;
> std::cout << __FUNCTION__ << ":- " << "writing to responder_
> this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
> responder_.Write(std::move(reply_), this);
> }
> }
>
> static void OnScanProgress(void* context, const std::string location)
> {
> try
> {
> auto _this =
> reinterpret_cast<RegisterScanProgressCallData*>(context);
> std::cout << __FUNCTION__ << ":- " << "thread - " <<
> std::this_thread::get_id() << " this " << _this << std::endl;
> _this->scanStatus_ = ScanStatus::IN_PROGRESS;
> _this->scanLocation_.assign(location);
> std::cout << __FUNCTION__ << ":- " << "triggering progress
> event thread - " << std::this_thread::get_id() << " this " << _this <<
> std::endl;
> _this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
> }
> catch (...)
> {
> // Avoid exception being thrown through DLL boundary.
> }
> }
>
> static void OnScanStarted(void* context)
> {
> try
> {
> auto _this =
> reinterpret_cast<RegisterScanProgressCallData*>(context);
> std::cout << __FUNCTION__ << ":- " << "thread - " <<
> std::this_thread::get_id() << " this " << _this << std::endl;
> _this->scanStatus_ = ScanStatus::STARTED;
> std::cout << __FUNCTION__ << ":- " << "triggering started
> event thread - " << std::this_thread::get_id() << " this " << _this <<
> std::endl;
> _this->scanPromise_->set_value(ScanStatus::STARTED);
> }
> catch (...)
> {
> // Avoid exception being thrown through DLL boundary.
> }
> }
>
> static void OnScanFinished(void* context, bool scanFailed)
> {
> try
> {
> auto _this =
> reinterpret_cast<RegisterScanProgressCallData*>(context);
> if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) &&
> (_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
> {
> std::cout << __FUNCTION__ << ":- " << "thread - " <<
> std::this_thread::get_id() << " this " << _this << std::endl;
> _this->scanStatus_ = (scanFailed == true) ?
> ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
> std::cout << __FUNCTION__ << ":- " << "triggering finished
> event thread - " << std::this_thread::get_id() << " this " << _this <<
> std::endl;
> _this->scanPromise_->set_value(_this->scanStatus_);
> }
> }
> catch (...)
> {
> // Avoid exception being thrown through DLL boundary.
> }
> }
>
>
> void waitForScanEvent()
> {
> if (scanPromise_ != nullptr)
> {
> scanPromise_.release();
> }
> scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
> auto scanFuture = scanPromise_->get_future();
> std::cout << __FUNCTION__ << ":- " << "waiting for started event
> with scanStatus_ - " << scanStatus_ << std::endl;
> auto eventData = scanFuture.get();
> scanStatus_ = eventData;
> std::cout << __FUNCTION__ << ":- " << "got started event now
> scanStatus_ - " << scanStatus_ << std::endl;
> }
>
> ping_pong::ScanProgressRequest request_;
> ping_pong::ScanProgressReply reply_;
> ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;
>
> ping_pong::ScanStatus scanStatus_;
> std::string scanLocation_;
> bool callBackTriggered_ = false;
> std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
> };
>
> class ServerImpl
> {
> public:
> ~ServerImpl()
> {
> server_->Shutdown();
> // Always shutdown the completion queue after the server.
> cq_->Shutdown();
> }
>
> void Run()
> {
> rapidjson::Document napaPluginConfig;
> napaPluginConfig.Parse(napa_plugin_config_json.c_str());
> int port(napaPluginConfig["port"].GetInt());
> std::string hostname(napaPluginConfig["hostname"].GetString());
> std::string server_address(hostname + ":" + std::to_string(port));
>
> // Check for Server Credentials
> // Comes as a string, we want a boolean.
> auto strGrpcUseInsecureCreds =
> getenv("NAPA_GRPC_USE_INSECURE_CREDS");
> // By default we will *not* use SSL for HTTP2.
> // TODO(Keith): Once Envoy understands SSL, revert back to false.
> bool bGrpcUseInsecureCreds = true;
> if (strGrpcUseInsecureCreds != NULL)
> {
> std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha
> >> bGrpcUseInsecureCreds;
> }
>
> std::cout
> << "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
> << bGrpcUseInsecureCreds
> << std::endl;
>
> grpc::EnableDefaultHealthCheckService(true);
> grpc::reflection::InitProtoReflectionServerBuilderPlugin();
> grpc::ServerBuilder builder;
>
> std::shared_ptr<grpc::ServerCredentials> serverCredentials;
>
> if (bGrpcUseInsecureCreds)
> {
> // Listen on the given address without any authentication
> mechanism.
> serverCredentials = grpc::InsecureServerCredentials();
> std::cout
> << "NOTE: We are using gRPC insecure server credentials,
> without SSL."
> << "This means you are not using HTTP2"
> << std::endl;
> }
> else
> {
> grpc::SslServerCredentialsOptions sslOptions;
> sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
> sslOptions.pem_key_cert_pairs.push_back({
> ssl_credentials_server_key.c_str(),
>
> ssl_credentials_server_cert.c_str() });
>
> serverCredentials = grpc::SslServerCredentials(sslOptions);
> std::cout
> << "NOTE: We are using gRPC secure server credentials,
> with SSL."
> << "This means you are using HTTP2"
> << std::endl;
> }
>
> builder.AddListeningPort(server_address, serverCredentials);
>
> builder.RegisterService(&pingPongAsync_);
>
> cq_ = builder.AddCompletionQueue();
> cqProgress_ = builder.AddCompletionQueue();
>
> // Use Keep-alive to stop initial slow calls
> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
> builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
> builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>
> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
>
> // Finally assemble the server.
> server_ = (builder.BuildAndStart());
> std::cout
> << "Server listening on "
> << server_address
> << std::endl;
>
> // kick start the queues for async RPCs
> HandleStartScan();
> HandleRegisterScanProgress();
> progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this,
> cq_.get());
> progressReportThread_ = std::thread(&ServerImpl::EventLoop, this,
> cqProgress_.get());
> // Wait for the server to shutdown. Note that some other thread
> must be
> // responsible for shutting down the server for this call to ever
> return.
> server_->Wait();
> }
>
> private:
> void HandleStartScan()
> {
> new StartScanCallData(&pingPongAsync_, cq_.get());
> }
>
> void HandleRegisterScanProgress()
> {
> new RegisterScanProgressCallData(&pingPongAsync_,
> cqProgress_.get());
> }
>
> // TODO - we'll need to have more than one event loops. running in
> thread for the RPC
> void EventLoop(ServerCompletionQueue* event_queue)
> {
> void* tag; // uniquely identifies a request.
> bool ok;
>
> while (event_queue->Next(&tag, &ok))
> {
> IAsyncRpcDataAdapter* adapter =
> static_cast<IAsyncRpcDataAdapter*>(tag);
> if (ok)
> {
> std::cout << __FUNCTION__ << ":- EventLoop event for " <<
> " thread - " << std::this_thread::get_id() << " queue " << event_queue << "
> tag " << adapter->Name() << std::endl;
> adapter->Proceed();
> }
> else
> {
> std::cout << __FUNCTION__ << ":- Need to see why this
> failed for " << " thread - " << std::this_thread::get_id() << " queue " <<
> event_queue << " tag " << adapter->Name() << std::endl;
> adapter->Proceed();
> continue;
> }
> }
> }
>
> std::unique_ptr<Server> server_;
> std::unique_ptr<ServerCompletionQueue> cq_;
> std::unique_ptr<ServerCompletionQueue> cqProgress_;
> ping_pong::PingPong::AsyncService pingPongAsync_;
> std::thread progressReportThread_;
> std::thread progressOtherRpcs_;
> };
>
> int main(int argc, char** argv)
> {
> ServerImpl server;
> server.Run();
> return 0;
> }
> //
> -------------------------------------------------------------------------
> // proto file
> syntax = "proto3";
>
> package ping_pong;
>
> service PingPong {
> rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
> rpc StartScan (ScanRequest) returns (ScanReply) {}
> rpc RegisterScanProgress (ScanProgressRequest) returns (stream
> ScanProgressReply) {}
> }
>
> message PingPongRequest {
> string input_msg = 1;
> }
>
> message PingPongReply {
> string output_msg = 1;
> }
>
> message ScanRequest{
> string scanType = 1;
> }
>
> message ScanReply{
> bool scanAlreadyInProgress = 1;
> }
>
> message ScanProgressRequest {
> }
>
> enum ScanStatus
> {
> NOT_STARTED = 0;
> STARTED = 1;
> IN_PROGRESS = 2;
> FINISHED_SUCCESS = 3;
> FINISHED_FAILED = 4;
> }
>
> message ScanProgressReply {
> ScanStatus status = 1;
> string location = 2;
> }
>
>
>
> On Tuesday, June 29, 2021 at 2:30:44 AM UTC+5:30 Vijay Pai wrote:
>
>> It's not really possible to say anything about this without knowing
>> what's in "Proceed" .
>>
>> On Monday, June 21, 2021 at 9:03:15 PM UTC-7 [email protected] wrote:
>>
>>> We are facing an issue where Next of a completion queue returns ok as
>>> false, if any other RPC is called.
>>> We need to implement a simple functionality of progress reporting of a
>>> long running RPC. So we need two RPCs
>>> 1. StartScan - to trigger the long running scan - runs in separate
>>> thread and its own completion queue
>>> 2. RegisterScanProgress - to register for the callback that reports
>>> status of the long running scan. - runs in separate thread and its own
>>> completion queue
>>>
>>> If we call StartScan first, and then cqProgress_(ServerCompletionQueue
>>> for RegisterScanProgress) behaves as expected. though, if we call
>>> RegisterScanProgress first, then immediately after call to StartScan the
>>> Next of cqProgress_ returns ok as false.
>>> Need help to understand this behavior. As after calling
>>> RegisterScanProgress there could be call to any other RPC, and so we'd need
>>> its queue to keep functioning for progress reporting.
>>> It'll be of great help if someone helps us understand what we are
>>> missing.
>>>
>>> Proto looks like following
>>> service ScanService{
>>> rpc StartScan (ScanRequest) returns (ScanReply) {}
>>> rpc RegisterScanProgress (ScanProgressRequest) returns (stream
>>> ScanProgressReply) {}
>>> }
>>> message ScanRequest{
>>> string scanType = 1;
>>> }
>>> message ScanReply{
>>> bool scanAlreadyInProgress = 1;
>>> }
>>> message ScanProgressRequest {
>>> }
>>> enum ScanStatus
>>> {
>>> NOT_STARTED = 0;
>>> STARTED = 1;
>>> IN_PROGRESS = 2;
>>> FINISHED_SUCCESS = 3;
>>> FINISHED_FAILED = 4;
>>> }
>>> message ScanProgressReply {
>>> ScanStatus status = 1;
>>> string details = 2;
>>> }
>>>
>>> this is how the server code looks
>>>
>>> class ServerImpl
>>> {
>>> public:
>>> ~ServerImpl()
>>> {
>>> server_->Shutdown();
>>> // Always shutdown the completion queue after the server.
>>> cq_->Shutdown();
>>> }
>>>
>>> void Run()
>>> {
>>> //... other server config code
>>> grpc::EnableDefaultHealthCheckService(true);
>>> grpc::reflection::InitProtoReflectionServerBuilderPlugin();
>>> grpc::ServerBuilder builder;
>>>
>>> builder.AddListeningPort(server_address, serverCredentials);
>>>
>>> cq_ = builder.AddCompletionQueue();
>>> cqProgress_ = builder.AddCompletionQueue();
>>>
>>> // Use Keep-alive to stop initial slow calls
>>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
>>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
>>> builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
>>> builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS,
>>> 1);
>>>
>>> // Finally assemble the server.
>>> server_ = (builder.BuildAndStart());
>>>
>>> // kick start the queues for async RPCs
>>> HandleStartScan(cq_.get());
>>> HandleRegisterScanProgress(cqProgress_.get());
>>> progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this,
>>> cq_.get());
>>> progressReportThread_ = std::thread(&ServerImpl::EventLoop, this,
>>> cqProgress_.get());
>>> server_->Wait();
>>> }
>>>
>>> private:
>>> void HandleStartScan(ServerCompletionQueue* event_queue)
>>> {
>>> new StartScanCallData(&scanAsync_, event_queue);
>>> }
>>> void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
>>> {
>>> new RegisterScanProgressCallData(&scanAsync_, event_queue);
>>> }
>>> void EventLoop(ServerCompletionQueue* event_queue)
>>> {
>>> void* tag; // uniquely identifies a request.
>>> bool ok;
>>> while (event_queue->Next(&tag, &ok))
>>> {
>>> IAsyncRpcDataAdapter* adapter =
>>> static_cast<IAsyncRpcDataAdapter*>(tag);
>>> if (ok)
>>> {
>>> adapter->Proceed();
>>> }
>>> else
>>> {
>>> std::cout << "OK is false" << std::endl;
>>> continue;
>>> }
>>> }
>>> }
>>>
>>> std::unique_ptr<ServerCompletionQueue> cq_;
>>> std::unique_ptr<ServerCompletionQueue> cqProgress_;
>>> napa::Nvbackend::AsyncService scanAsync_;
>>> std::unique_ptr<Server> server_;
>>> std::thread progressReportThread_;
>>> std::thread progressOtherRpcs_;
>>> };
>>>
>>> int main(int argc, char** argv)
>>> {
>>> ServerImpl server;
>>> server.Run();
>>> return 0;
>>> }
>>>
>>> and this is how the service code looks
>>>
>>> enum CallStatus
>>> {
>>> CREATE,
>>> PROCESS,
>>> FINISH,
>>> PUSH_TO_BACK
>>> };
>>>
>>> // interface for service to handle the async RPC
>>> // every RPC will need to implement this interface, so that GRPC server
>>> can
>>> // call RPC without knowing its type.
>>> class IAsyncRpcDataAdapter
>>> {
>>> public:
>>> virtual void Proceed() = 0;
>>> virtual ~IAsyncRpcDataAdapter() = default;
>>>
>>> protected:
>>> IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
>>> : scanAsync_(service)
>>> , cq_(cq)
>>> , status_(CREATE)
>>> {
>>> }
>>> ScanService* scanAsync_;
>>> ServerCompletionQueue* cq_;
>>> ServerContext ctx_;
>>> CallStatus status_; // The current serving state.
>>> grpc::Alarm alarm_;
>>> };
>>>
>>> class StartScanCallData : public IAsyncRpcDataAdapter
>>> {
>>> public:
>>> StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
>>> void Proceed() override;
>>>
>>> private:
>>> ScanRequest request_;
>>> ScanReply reply_;
>>>
>>> ServerAsyncResponseWriter<ScanReply> responder_;
>>> grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest
>>> *request, ScanReply *reply);
>>> };
>>>
>>> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
>>> {
>>> public:
>>> RegisterScanProgressCallData(ScanService* service,
>>> ServerCompletionQueue* cq);
>>> void Proceed() override;
>>>
>>> private:
>>> ScanProgressRequest request_;
>>> ScanProgressReply reply_;
>>> ServerAsyncWriter<ScanProgressReply> responder_;
>>>
>>> void ProgressReport();
>>> void waitForScanEvent();
>>> std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
>>> };
>>>
>>>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/ebb4a401-0763-41a7-bc29-6249486974c8n%40googlegroups.com.