The ClientContext object needs to be alive for the entire duration of the 
RPC. The scope of your ClientContext is too limited. I suggest 
restructuring your code such that RPC invoking function is separated from 
the channel state checking. Also, gRPC will automatically try to connect 
the channel if there are pending calls, so the state checking is 
unnecessary.

On Wednesday, March 6, 2019 at 6:18:42 PM UTC-8, Sidhartha Thota wrote:
>
> HI,
>
> I am new to GRPC and using GRPC C++ for my project. I am using streaming 
> RPC where client will stream the messages it has and server listens to the 
> messages. When streaming is in progress from client side (lets say after 
> couple of messages sent from client end), I restart server (Ctrl+c and 
> start server again) then client is unable to establish new session with 
> server.
>
> If I am not wrong, writer is created per session so I am trying to create 
> new session by creating new writer instance from Stub. I see following 
> error at client
>
> E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion 
> failed: pthread_mutex_lock(mu) == 0     [Client output attached below]
>
> GRPC version using : v1.18
>
> -----------------
> Client code:
> -------------------
>
> Message getMessage(int i) {
>   Message mesg;
>   mesg.set_protocol_version(i);
>   return mesg;
> }
>
> class Client {
>  public:
>   Client(){
>   }
>
>   void sendMessages(std::shared_ptr<Channel> channel,
>                     std::unique_ptr<Listener::Stub> &stub_,
>                     std::unique_ptr<ClientWriter<Message> > &writer) {
>     Message message;
>     const int kPoints = 10;
>
>     for (int i = 0; i < kPoints; i++) {
>       Message message = getMessage(i);
>       std::cout << "Waiting to write " << std::endl;
>       if (!writer->Write(message)) {
>         std::cout << "broken stream. Will create new session" << "\n";
>         while (channel->GetState(true) != GRPC_CHANNEL_READY) {
>           std::cout << "Retry connection... after 1 sec" << std::endl;
>           std::this_thread::sleep_for(std::chrono::milliseconds(1000));
>         }
>         ClientContext context_retry;
>         ReplyMessage reply_message_retry;
>         writer = std::move(stub_->receive_resources(&context_retry, 
> &reply_message_retry));
>         std::cout << "created new session successfully" << std::endl;
>       }
>       std::this_thread::sleep_for(std::chrono::milliseconds(1000));
>     }
>   }
> };
>
> int main(int argc, char** argv) {
>   std::shared_ptr<Channel> channel;
>   channel = grpc::CreateChannel("localhost:50051",
>                       grpc::InsecureChannelCredentials());
>   std::unique_ptr<Listener::Stub> stub_(Listener::NewStub(channel));
>   ReplyMessage reply_message;
>   ClientContext context;
>   std::unique_ptr<ClientWriter<Message> > 
> writer(stub_->receive_resources(&context, &reply_message));
>   Client client;
>
>   std::cout << "-------------- SendMessages --------------" << std::endl;
>   client.sendMessages(channel, stub_, writer);
>   writer->WritesDone();
>   Status status = writer->Finish();
>   if (status.ok()) {
>     std::cout << "Close message " << reply_message.close_message()
>               << std::endl;
>   } else {
>     std::cout << "RecordRoute rpc failed." << std::endl;
>   }
>
>   return 0;
> }
>
> ---------------------
> RPC proto file:
> -------------------
>
> package listener;
>
> // Interface exported by the server.
> service Listener {
>   rpc receive_resources(stream Message) returns (ReplyMessage) {}
> }
>
> // Message and ReplyMessage are protos not defined here (Let me know if 
> they are applicable, I can add).
>
> ------------------
> Server code
> -----------------
>
> class ListenerImpl final: public Listener::Service {
>   Status receive_resources(ServerContext* context, ServerReader<Message>* 
> reader,
>                       ReplyMessage* message) override {
>     Message gMessage;
>     unsigned int message_count = 0;
>
>     while (reader->Read(&gMessage)) {
>       message_count++;
>       std::cout << "received" << std::endl;
>       std::cout << gMessage.protocol_version() << std::endl;
>     }
>     std::cout << message_count << std::endl;
>     message->set_close_message("close");
>     std::cout << "sent close message" << std::endl;
>     return Status::OK;
>   }
> };
>
> void RunServer() {
>   std::string server_address("0.0.0.0:50051");
>   ListenerImpl service;
>
>   ServerBuilder builder;
>   builder.AddListeningPort(server_address, 
> grpc::InsecureServerCredentials());
>   builder.RegisterService(&service);
>   std::unique_ptr<Server> server(builder.BuildAndStart());
>   std::cout << "Server listening on " << server_address << std::endl;
>   server->Wait();
> }
>
> int main(int argc, char** argv) {
>   RunServer();
>   return 0;
> }
>
> -----------------------
> Client side output
> -----------------------
>
> -------------- SendMessages --------------
> Waiting to write 
> Waiting to write 
> Waiting to write 
> Waiting to write 
> Waiting to write 
> Waiting to write 
> Waiting to write 
> Waiting to write 
> broken stream. Will create new session ---> server stopped and started 
> again
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> Retry connection... after 1 sec
> created new session successfully
> Waiting to write 
> E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion 
> failed: pthread_mutex_lock(mu) == 0 ------------------> After creating new 
> writer instance, when I try to write again using writer, then this error is 
> thrown
> Aborted
>
> -------------------------
> I also tried reusing same ClientConext but received below error so 
> creating new ClientContext object for every creation of writer
>
> """
> E0307 02:14:13.928433306   23563 client_context.cc:87]       assertion 
> failed: call_ == nullptr
> Aborted
> """
>
> I also tried creating new channel and stub and also stub alone when 
> channel says it is in ready state, still I see same issue (E0307 
> 02:09:43.854424957   22379 sync_posix.cc:85]           assertion failed: 
> pthread_mutex_lock(mu) == 0)
>
> Not sure where I am missing.
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/3b7031c0-1c14-4a98-89f9-146de44a75da%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to