here is the complete code:
#include <stdlib.h>
#pragma region gRPC
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#pragma endregion gRPC
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <iomanip>
#include <thread>
#pragma region Generated Files
#include "napa_plugins_config.hpp"
#include "napa_plugins_core_ssl.hpp"
#pragma endregion Generated Files
#pragma region
#include "rapidjson/document.h"
#pragma endregion
#include <future>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/alarm.h>
#include "protos/PingPong.grpc.pb.h"
#include "protos/PingPong.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerCompletionQueue;
using grpc::ServerAsyncResponseWriter;
class PingPongServiceImpl final
: public ping_pong::PingPong::Service {
public:
~PingPongServiceImpl() {
std::cout << "Destructor of PingPongServiceImpl" << std::endl;
}
private:
grpc::Status PingUnary(
grpc::ServerContext* context,
const ping_pong::PingPongRequest* request,
ping_pong::PingPongReply* reply) override
{
std::cout << "PingPong" << std::endl;
std::cout
<< "PingPong - input_msg = "
<< request->input_msg()
<< std::endl;
if (request->input_msg() == "hello") {
reply->set_output_msg("world");
}
else {
reply->set_output_msg("I can't pong unless you ping me
'hello'!");
}
std::cout << "Replying with " << reply->output_msg() << std::endl;
return grpc::Status::OK;
}
};
typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_
const std::string location);
typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool
scanFailed);
class DummyScanner
{
public:
DummyScanner()
:scanContext_(NULL),
scanCount_(0),
scanProgressCallback_(NULL),
scanStartedCallback_(NULL),
scanFinishedCallback_(NULL)
{
}
~DummyScanner()
{
if (dummyScanThread_.joinable())
{
dummyScanThread_.join();
}
}
void RegisterScanProgressCallbacks(
ScanProgressCallbackType* onScanProgress,
ScanStartedCallbackType* onScanStarted,
ScanFinishedCallbackType* onScanFinished,
_In_opt_ void* context)
{
scanProgressCallback_ = onScanProgress;
scanStartedCallback_ = onScanStarted;
scanFinishedCallback_ = onScanFinished;
scanContext_ = context;
}
bool StartScan()
{
if (scanCount_ > 0)
{
return false;
}
if (dummyScanThread_.joinable())
{
dummyScanThread_.join();
}
dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport,
this);
return true;
}
private:
void dummyScanAndReport()
{
while (true)
{
if (scanCount_ == 100)
{
if (scanFinishedCallback_ != NULL)
{
scanFinishedCallback_(scanContext_, false);
}
break;
}
if (scanCount_ == 1)
{
if (scanStartedCallback_ != NULL)
{
scanStartedCallback_(scanContext_);
}
}
if (scanCount_ < 100)
{
if (scanProgressCallback_ != NULL)
{
std::stringstream ss;
ss << scanCount_;
scanProgressCallback_(scanContext_, ss.str());
}
}
scanCount_++;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
scanCount_ = 0;
}
private:
void* scanContext_;
int scanCount_;
ScanProgressCallbackType* scanProgressCallback_;
ScanStartedCallbackType* scanStartedCallback_;
ScanFinishedCallbackType* scanFinishedCallback_;
std::thread dummyScanThread_;
};
enum CallStatus
{
CREATE,
PROCESS,
FINISH,
PUSH_TO_BACK
};
// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
virtual void Proceed() = 0;
virtual std::string Name() = 0;
virtual ~IAsyncRpcDataAdapter() = default;
protected:
IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service,
ServerCompletionQueue* cq)
: pingPongAsync_(service)
, cq_(cq)
, status_(CREATE)
{
}
ping_pong::PingPong::AsyncService* pingPongAsync_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
CallStatus status_; // The current serving state.
grpc::Alarm alarm_;
};
class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
StartScanCallData(ping_pong::PingPong::AsyncService* service,
ServerCompletionQueue* cq)
{
Proceed();
}
void Proceed() override
{
std::cout << __FUNCTION__ << std::endl;
if (status_ == CREATE)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this
<< std::endl;
pingPongAsync_->RequestStartScan(&ctx_, &request_, &responder_,
cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
}
else if (status_ == PROCESS)
{
std::cout << __FUNCTION__ << ":- " << "PROCESS this - " << this
<< std::endl;
// we need to keep an instance of api call data in the queue so
that it keeps waiting in HandleRpcs.
new StartScanCallData(pingPongAsync_, cq_);
const Status status = StartScan(&ctx_, &request_, &reply_);
if (status.ok())
{
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
}
else
{
responder_.FinishWithError(status, this);
}
}
else
{
GPR_ASSERT(status_ == FINISH);
std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this
<< std::endl;
delete this;
}
}
DummyScanner scanner_;
std::string Name() override
{
return "StartScanCallData";
}
private:
grpc::Status StartScan(grpc::ServerContext* context, const
ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
{
std::cout << __FUNCTION__ << ":- " <<
"\n\n----------------------------------------" <<
std::this_thread::get_id() << std::endl;
// start dummy thread callback scan
scanner_.StartScan();
reply->set_scanalreadyinprogress(true);
return Status::OK;
}
ping_pong::ScanRequest request_;
ping_pong::ScanReply reply_;
ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
};
class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
RegisterScanProgressCallData(ping_pong::PingPong::AsyncService*
service, ServerCompletionQueue* cq)
{
Proceed();
}
void Proceed() override
{
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << std::endl;
if (status_ == CREATE)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << " CREATE this - " << this << std::endl;
pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_,
&responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
}
else if (status_ == PROCESS)
{
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
if (!callBackTriggered_)
{
// we need to keep an instance of api call data in the
queue so that it keeps waiting in HandleRpcs.
new RegisterScanProgressCallData(pingPongAsync_, cq_);
const Status status = RegisterScanProgress();
if (!status.ok())
{
status_ = FINISH;
responder_.Finish(Status::OK, this);
return;
}
else
{
scanStatus_ = ScanStatus::NOT_STARTED;
}
callBackTriggered_ = true;
}
else
{
waitForScanEvent();
}
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
ProgressReport();
}
else if (status_ == PUSH_TO_BACK)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << " ********************************* Setting
alarm" << std::endl;
alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME),
this);
}
else
{
GPR_ASSERT(status_ == FINISH);
std::cout << __FUNCTION__ << ":- " << " thread - " <<
std::this_thread::get_id() << " FINISH this - " << this << std::endl;
delete this;
}
}
std::string Name() override
{
return "RegisterScanProgressCallData****";
}
private:
grpc::Status RegisterScanProgress()
{
std::cout << __FUNCTION__ << ":- " <<
"---------------------------------------- thread - " <<
std::this_thread::get_id() << std::endl;
scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
RegisterScanProgressCallData::OnScanStarted,
RegisterScanProgressCallData::OnScanFinished,
this);
return Status::OK;
}
grpc::Status UnRegisterScanProgress()
{
return Status::OK;
}
void UpdateReply()
{
reply_.set_status(scanStatus_);
reply_.set_location(scanLocation_);
}
void ProgressReport()
{
UpdateReply();
std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " <<
scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
|| (scanStatus_ == ScanStatus::FINISHED_FAILED))
{
status_ = FINISH;
std::cout << __FUNCTION__ << ":- " << "writing to responder_
this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
responder_.Finish(Status::OK, this);
}
else
{
status_ = PUSH_TO_BACK;
std::cout << __FUNCTION__ << ":- " << "writing to responder_
this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
responder_.Write(std::move(reply_), this);
}
}
static void OnScanProgress(void* context, const std::string location)
{
try
{
auto _this =
reinterpret_cast<RegisterScanProgressCallData*>(context);
std::cout << __FUNCTION__ << ":- " << "thread - " <<
std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = ScanStatus::IN_PROGRESS;
_this->scanLocation_.assign(location);
std::cout << __FUNCTION__ << ":- " << "triggering progress
event thread - " << std::this_thread::get_id() << " this " << _this <<
std::endl;
_this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
static void OnScanStarted(void* context)
{
try
{
auto _this =
reinterpret_cast<RegisterScanProgressCallData*>(context);
std::cout << __FUNCTION__ << ":- " << "thread - " <<
std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = ScanStatus::STARTED;
std::cout << __FUNCTION__ << ":- " << "triggering started event
thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanPromise_->set_value(ScanStatus::STARTED);
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
static void OnScanFinished(void* context, bool scanFailed)
{
try
{
auto _this =
reinterpret_cast<RegisterScanProgressCallData*>(context);
if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) &&
(_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
{
std::cout << __FUNCTION__ << ":- " << "thread - " <<
std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = (scanFailed == true) ?
ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
std::cout << __FUNCTION__ << ":- " << "triggering finished
event thread - " << std::this_thread::get_id() << " this " << _this <<
std::endl;
_this->scanPromise_->set_value(_this->scanStatus_);
}
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
void waitForScanEvent()
{
if (scanPromise_ != nullptr)
{
scanPromise_.release();
}
scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
auto scanFuture = scanPromise_->get_future();
std::cout << __FUNCTION__ << ":- " << "waiting for started event
with scanStatus_ - " << scanStatus_ << std::endl;
auto eventData = scanFuture.get();
scanStatus_ = eventData;
std::cout << __FUNCTION__ << ":- " << "got started event now
scanStatus_ - " << scanStatus_ << std::endl;
}
ping_pong::ScanProgressRequest request_;
ping_pong::ScanProgressReply reply_;
ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;
ping_pong::ScanStatus scanStatus_;
std::string scanLocation_;
bool callBackTriggered_ = false;
std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
};
class ServerImpl
{
public:
~ServerImpl()
{
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void Run()
{
rapidjson::Document napaPluginConfig;
napaPluginConfig.Parse(napa_plugin_config_json.c_str());
int port(napaPluginConfig["port"].GetInt());
std::string hostname(napaPluginConfig["hostname"].GetString());
std::string server_address(hostname + ":" + std::to_string(port));
// Check for Server Credentials
// Comes as a string, we want a boolean.
auto strGrpcUseInsecureCreds =
getenv("NAPA_GRPC_USE_INSECURE_CREDS");
// By default we will *not* use SSL for HTTP2.
// TODO(Keith): Once Envoy understands SSL, revert back to false.
bool bGrpcUseInsecureCreds = true;
if (strGrpcUseInsecureCreds != NULL)
{
std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha
>> bGrpcUseInsecureCreds;
}
std::cout
<< "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
<< bGrpcUseInsecureCreds
<< std::endl;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
grpc::ServerBuilder builder;
std::shared_ptr<grpc::ServerCredentials> serverCredentials;
if (bGrpcUseInsecureCreds)
{
// Listen on the given address without any authentication
mechanism.
serverCredentials = grpc::InsecureServerCredentials();
std::cout
<< "NOTE: We are using gRPC insecure server credentials,
without SSL."
<< "This means you are not using HTTP2"
<< std::endl;
}
else
{
grpc::SslServerCredentialsOptions sslOptions;
sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
sslOptions.pem_key_cert_pairs.push_back({
ssl_credentials_server_key.c_str(),
ssl_credentials_server_cert.c_str() });
serverCredentials = grpc::SslServerCredentials(sslOptions);
std::cout
<< "NOTE: We are using gRPC secure server credentials, with
SSL."
<< "This means you are using HTTP2"
<< std::endl;
}
builder.AddListeningPort(server_address, serverCredentials);
builder.RegisterService(&pingPongAsync_);
cq_ = builder.AddCompletionQueue();
cqProgress_ = builder.AddCompletionQueue();
// Use Keep-alive to stop initial slow calls
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS,
1);
// Finally assemble the server.
server_ = (builder.BuildAndStart());
std::cout
<< "Server listening on "
<< server_address
<< std::endl;
// kick start the queues for async RPCs
HandleStartScan();
HandleRegisterScanProgress();
progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this,
cq_.get());
progressReportThread_ = std::thread(&ServerImpl::EventLoop, this,
cqProgress_.get());
// Wait for the server to shutdown. Note that some other thread
must be
// responsible for shutting down the server for this call to ever
return.
server_->Wait();
}
private:
void HandleStartScan()
{
new StartScanCallData(&pingPongAsync_, cq_.get());
}
void HandleRegisterScanProgress()
{
new RegisterScanProgressCallData(&pingPongAsync_,
cqProgress_.get());
}
// TODO - we'll need to have more than one event loops. running in
thread for the RPC
void EventLoop(ServerCompletionQueue* event_queue)
{
void* tag; // uniquely identifies a request.
bool ok;
while (event_queue->Next(&tag, &ok))
{
IAsyncRpcDataAdapter* adapter =
static_cast<IAsyncRpcDataAdapter*>(tag);
if (ok)
{
std::cout << __FUNCTION__ << ":- EventLoop event for " <<
" thread - " << std::this_thread::get_id() << " queue " << event_queue << "
tag " << adapter->Name() << std::endl;
adapter->Proceed();
}
else
{
std::cout << __FUNCTION__ << ":- Need to see why this
failed for " << " thread - " << std::this_thread::get_id() << " queue " <<
event_queue << " tag " << adapter->Name() << std::endl;
adapter->Proceed();
continue;
}
}
}
std::unique_ptr<Server> server_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<ServerCompletionQueue> cqProgress_;
ping_pong::PingPong::AsyncService pingPongAsync_;
std::thread progressReportThread_;
std::thread progressOtherRpcs_;
};
int main(int argc, char** argv)
{
ServerImpl server;
server.Run();
return 0;
}
// -------------------------------------------------------------------------
// proto file
syntax = "proto3";
package ping_pong;
service PingPong {
rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
rpc StartScan (ScanRequest) returns (ScanReply) {}
rpc RegisterScanProgress (ScanProgressRequest) returns (stream
ScanProgressReply) {}
}
message PingPongRequest {
string input_msg = 1;
}
message PingPongReply {
string output_msg = 1;
}
message ScanRequest{
string scanType = 1;
}
message ScanReply{
bool scanAlreadyInProgress = 1;
}
message ScanProgressRequest {
}
enum ScanStatus
{
NOT_STARTED = 0;
STARTED = 1;
IN_PROGRESS = 2;
FINISHED_SUCCESS = 3;
FINISHED_FAILED = 4;
}
message ScanProgressReply {
ScanStatus status = 1;
string location = 2;
}
On Tuesday, June 29, 2021 at 2:30:44 AM UTC+5:30 Vijay Pai wrote:
> It's not really possible to say anything about this without knowing what's
> in "Proceed" .
>
> On Monday, June 21, 2021 at 9:03:15 PM UTC-7 [email protected] wrote:
>
>> We are facing an issue where Next of a completion queue returns ok as
>> false, if any other RPC is called.
>> We need to implement a simple functionality of progress reporting of a
>> long running RPC. So we need two RPCs
>> 1. StartScan - to trigger the long running scan - runs in separate thread
>> and its own completion queue
>> 2. RegisterScanProgress - to register for the callback that reports
>> status of the long running scan. - runs in separate thread and its own
>> completion queue
>>
>> If we call StartScan first, and then cqProgress_(ServerCompletionQueue
>> for RegisterScanProgress) behaves as expected. though, if we call
>> RegisterScanProgress first, then immediately after call to StartScan the
>> Next of cqProgress_ returns ok as false.
>> Need help to understand this behavior. As after calling
>> RegisterScanProgress there could be call to any other RPC, and so we'd need
>> its queue to keep functioning for progress reporting.
>> It'll be of great help if someone helps us understand what we are missing.
>>
>> Proto looks like following
>> service ScanService{
>> rpc StartScan (ScanRequest) returns (ScanReply) {}
>> rpc RegisterScanProgress (ScanProgressRequest) returns (stream
>> ScanProgressReply) {}
>> }
>> message ScanRequest{
>> string scanType = 1;
>> }
>> message ScanReply{
>> bool scanAlreadyInProgress = 1;
>> }
>> message ScanProgressRequest {
>> }
>> enum ScanStatus
>> {
>> NOT_STARTED = 0;
>> STARTED = 1;
>> IN_PROGRESS = 2;
>> FINISHED_SUCCESS = 3;
>> FINISHED_FAILED = 4;
>> }
>> message ScanProgressReply {
>> ScanStatus status = 1;
>> string details = 2;
>> }
>>
>> this is how the server code looks
>>
>> class ServerImpl
>> {
>> public:
>> ~ServerImpl()
>> {
>> server_->Shutdown();
>> // Always shutdown the completion queue after the server.
>> cq_->Shutdown();
>> }
>>
>> void Run()
>> {
>> //... other server config code
>> grpc::EnableDefaultHealthCheckService(true);
>> grpc::reflection::InitProtoReflectionServerBuilderPlugin();
>> grpc::ServerBuilder builder;
>>
>> builder.AddListeningPort(server_address, serverCredentials);
>>
>> cq_ = builder.AddCompletionQueue();
>> cqProgress_ = builder.AddCompletionQueue();
>>
>> // Use Keep-alive to stop initial slow calls
>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
>> builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
>> builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
>> builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS,
>> 1);
>>
>> // Finally assemble the server.
>> server_ = (builder.BuildAndStart());
>>
>> // kick start the queues for async RPCs
>> HandleStartScan(cq_.get());
>> HandleRegisterScanProgress(cqProgress_.get());
>> progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this,
>> cq_.get());
>> progressReportThread_ = std::thread(&ServerImpl::EventLoop, this,
>> cqProgress_.get());
>> server_->Wait();
>> }
>>
>> private:
>> void HandleStartScan(ServerCompletionQueue* event_queue)
>> {
>> new StartScanCallData(&scanAsync_, event_queue);
>> }
>> void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
>> {
>> new RegisterScanProgressCallData(&scanAsync_, event_queue);
>> }
>> void EventLoop(ServerCompletionQueue* event_queue)
>> {
>> void* tag; // uniquely identifies a request.
>> bool ok;
>> while (event_queue->Next(&tag, &ok))
>> {
>> IAsyncRpcDataAdapter* adapter =
>> static_cast<IAsyncRpcDataAdapter*>(tag);
>> if (ok)
>> {
>> adapter->Proceed();
>> }
>> else
>> {
>> std::cout << "OK is false" << std::endl;
>> continue;
>> }
>> }
>> }
>>
>> std::unique_ptr<ServerCompletionQueue> cq_;
>> std::unique_ptr<ServerCompletionQueue> cqProgress_;
>> napa::Nvbackend::AsyncService scanAsync_;
>> std::unique_ptr<Server> server_;
>> std::thread progressReportThread_;
>> std::thread progressOtherRpcs_;
>> };
>>
>> int main(int argc, char** argv)
>> {
>> ServerImpl server;
>> server.Run();
>> return 0;
>> }
>>
>> and this is how the service code looks
>>
>> enum CallStatus
>> {
>> CREATE,
>> PROCESS,
>> FINISH,
>> PUSH_TO_BACK
>> };
>>
>> // interface for service to handle the async RPC
>> // every RPC will need to implement this interface, so that GRPC server
>> can
>> // call RPC without knowing its type.
>> class IAsyncRpcDataAdapter
>> {
>> public:
>> virtual void Proceed() = 0;
>> virtual ~IAsyncRpcDataAdapter() = default;
>>
>> protected:
>> IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
>> : scanAsync_(service)
>> , cq_(cq)
>> , status_(CREATE)
>> {
>> }
>> ScanService* scanAsync_;
>> ServerCompletionQueue* cq_;
>> ServerContext ctx_;
>> CallStatus status_; // The current serving state.
>> grpc::Alarm alarm_;
>> };
>>
>> class StartScanCallData : public IAsyncRpcDataAdapter
>> {
>> public:
>> StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
>> void Proceed() override;
>>
>> private:
>> ScanRequest request_;
>> ScanReply reply_;
>>
>> ServerAsyncResponseWriter<ScanReply> responder_;
>> grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest
>> *request, ScanReply *reply);
>> };
>>
>> class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
>> {
>> public:
>> RegisterScanProgressCallData(ScanService* service,
>> ServerCompletionQueue* cq);
>> void Proceed() override;
>>
>> private:
>> ScanProgressRequest request_;
>> ScanProgressReply reply_;
>> ServerAsyncWriter<ScanProgressReply> responder_;
>>
>> void ProgressReport();
>> void waitForScanEvent();
>> std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
>> };
>>
>>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/57d8322f-09f9-4fa0-8501-2a257bc3f0d2n%40googlegroups.com.