We are facing an issue where Next of a completion queue returns ok as
false, if any other RPC is called.
We need to implement a simple functionality of progress reporting of a long
running RPC. So we need two RPCs
1. StartScan - to trigger the long running scan - runs in separate thread
and its own completion queue
2. RegisterScanProgress - to register for the callback that reports status
of the long running scan. - runs in separate thread and its own completion
queue
If we call StartScan first, and then cqProgress_(ServerCompletionQueue
for RegisterScanProgress) behaves as expected. though, if we call
RegisterScanProgress first, then immediately after call to StartScan the
Next of cqProgress_ returns ok as false.
Need help to understand this behavior. As after calling
RegisterScanProgress there could be call to any other RPC, and so we'd need
its queue to keep functioning for progress reporting.
It'll be of great help if someone helps us understand what we are missing.
Proto looks like following
service ScanService{
rpc StartScan (ScanRequest) returns (ScanReply) {}
rpc RegisterScanProgress (ScanProgressRequest) returns (stream
ScanProgressReply) {}
}
message ScanRequest{
string scanType = 1;
}
message ScanReply{
bool scanAlreadyInProgress = 1;
}
message ScanProgressRequest {
}
enum ScanStatus
{
NOT_STARTED = 0;
STARTED = 1;
IN_PROGRESS = 2;
FINISHED_SUCCESS = 3;
FINISHED_FAILED = 4;
}
message ScanProgressReply {
ScanStatus status = 1;
string details = 2;
}
this is how the server code looks
class ServerImpl
{
public:
~ServerImpl()
{
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void Run()
{
//... other server config code
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, serverCredentials);
cq_ = builder.AddCompletionQueue();
cqProgress_ = builder.AddCompletionQueue();
// Use Keep-alive to stop initial slow calls
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
// Finally assemble the server.
server_ = (builder.BuildAndStart());
// kick start the queues for async RPCs
HandleStartScan(cq_.get());
HandleRegisterScanProgress(cqProgress_.get());
progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this,
cq_.get());
progressReportThread_ = std::thread(&ServerImpl::EventLoop, this,
cqProgress_.get());
server_->Wait();
}
private:
void HandleStartScan(ServerCompletionQueue* event_queue)
{
new StartScanCallData(&scanAsync_, event_queue);
}
void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
{
new RegisterScanProgressCallData(&scanAsync_, event_queue);
}
void EventLoop(ServerCompletionQueue* event_queue)
{
void* tag; // uniquely identifies a request.
bool ok;
while (event_queue->Next(&tag, &ok))
{
IAsyncRpcDataAdapter* adapter =
static_cast<IAsyncRpcDataAdapter*>(tag);
if (ok)
{
adapter->Proceed();
}
else
{
std::cout << "OK is false" << std::endl;
continue;
}
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<ServerCompletionQueue> cqProgress_;
napa::Nvbackend::AsyncService scanAsync_;
std::unique_ptr<Server> server_;
std::thread progressReportThread_;
std::thread progressOtherRpcs_;
};
int main(int argc, char** argv)
{
ServerImpl server;
server.Run();
return 0;
}
and this is how the service code looks
enum CallStatus
{
CREATE,
PROCESS,
FINISH,
PUSH_TO_BACK
};
// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
virtual void Proceed() = 0;
virtual ~IAsyncRpcDataAdapter() = default;
protected:
IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
: scanAsync_(service)
, cq_(cq)
, status_(CREATE)
{
}
ScanService* scanAsync_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
CallStatus status_; // The current serving state.
grpc::Alarm alarm_;
};
class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
void Proceed() override;
private:
ScanRequest request_;
ScanReply reply_;
ServerAsyncResponseWriter<ScanReply> responder_;
grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest
*request, ScanReply *reply);
};
class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
RegisterScanProgressCallData(ScanService* service, ServerCompletionQueue*
cq);
void Proceed() override;
private:
ScanProgressRequest request_;
ScanProgressReply reply_;
ServerAsyncWriter<ScanProgressReply> responder_;
void ProgressReport();
void waitForScanEvent();
std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
};
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/46b73b0d-ac81-4278-aa5f-8b721a86b0d8n%40googlegroups.com.