HI,

I am new to GRPC and using GRPC C++ for my project. I am using streaming 
RPC where client will stream the messages it has and server listens to the 
messages. When streaming is in progress from client side (lets say after 
couple of messages sent from client end), I restart server (Ctrl+c and 
start server again) then client is unable to establish new session with 
server.

If I am not wrong, writer is created per session so I am trying to create 
new session by creating new writer instance from Stub. I see following 
error at client

E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion 
failed: pthread_mutex_lock(mu) == 0     [Client output attached below]

GRPC version using : v1.18

-----------------
Client code:
-------------------

Message getMessage(int i) {
  Message mesg;
  mesg.set_protocol_version(i);
  return mesg;
}

class Client {
 public:
  Client(){
  }

  void sendMessages(std::shared_ptr<Channel> channel,
                    std::unique_ptr<Listener::Stub> &stub_,
                    std::unique_ptr<ClientWriter<Message> > &writer) {
    Message message;
    const int kPoints = 10;

    for (int i = 0; i < kPoints; i++) {
      Message message = getMessage(i);
      std::cout << "Waiting to write " << std::endl;
      if (!writer->Write(message)) {
        std::cout << "broken stream. Will create new session" << "\n";
        while (channel->GetState(true) != GRPC_CHANNEL_READY) {
          std::cout << "Retry connection... after 1 sec" << std::endl;
          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
        ClientContext context_retry;
        ReplyMessage reply_message_retry;
        writer = std::move(stub_->receive_resources(&context_retry, 
&reply_message_retry));
        std::cout << "created new session successfully" << std::endl;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
  }
};

int main(int argc, char** argv) {
  std::shared_ptr<Channel> channel;
  channel = grpc::CreateChannel("localhost:50051",
                      grpc::InsecureChannelCredentials());
  std::unique_ptr<Listener::Stub> stub_(Listener::NewStub(channel));
  ReplyMessage reply_message;
  ClientContext context;
  std::unique_ptr<ClientWriter<Message> > 
writer(stub_->receive_resources(&context, &reply_message));
  Client client;

  std::cout << "-------------- SendMessages --------------" << std::endl;
  client.sendMessages(channel, stub_, writer);
  writer->WritesDone();
  Status status = writer->Finish();
  if (status.ok()) {
    std::cout << "Close message " << reply_message.close_message()
              << std::endl;
  } else {
    std::cout << "RecordRoute rpc failed." << std::endl;
  }

  return 0;
}

---------------------
RPC proto file:
-------------------

package listener;

// Interface exported by the server.
service Listener {
  rpc receive_resources(stream Message) returns (ReplyMessage) {}
}

// Message and ReplyMessage are protos not defined here (Let me know if 
they are applicable, I can add).

------------------
Server code
-----------------

class ListenerImpl final: public Listener::Service {
  Status receive_resources(ServerContext* context, ServerReader<Message>* 
reader,
                      ReplyMessage* message) override {
    Message gMessage;
    unsigned int message_count = 0;

    while (reader->Read(&gMessage)) {
      message_count++;
      std::cout << "received" << std::endl;
      std::cout << gMessage.protocol_version() << std::endl;
    }
    std::cout << message_count << std::endl;
    message->set_close_message("close");
    std::cout << "sent close message" << std::endl;
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  ListenerImpl service;

  ServerBuilder builder;
  builder.AddListeningPort(server_address, 
grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main(int argc, char** argv) {
  RunServer();
  return 0;
}

-----------------------
Client side output
-----------------------

-------------- SendMessages --------------
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
broken stream. Will create new session ---> server stopped and started again
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
created new session successfully
Waiting to write 
E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion 
failed: pthread_mutex_lock(mu) == 0 ------------------> After creating new 
writer instance, when I try to write again using writer, then this error is 
thrown
Aborted

-------------------------
I also tried reusing same ClientConext but received below error so creating 
new ClientContext object for every creation of writer

"""
E0307 02:14:13.928433306   23563 client_context.cc:87]       assertion 
failed: call_ == nullptr
Aborted
"""

I also tried creating new channel and stub and also stub alone when channel 
says it is in ready state, still I see same issue (E0307 
02:09:43.854424957   22379 sync_posix.cc:85]           assertion failed: 
pthread_mutex_lock(mu) == 0)

Not sure where I am missing.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/19738d79-f39f-4681-8e10-6d0957e06fcd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to