This was because of difference in protobuf version. client was running
3.0.0 while server was running 3.1.0.
Resolving the protobuf version resolved the issue.
On Thursday, March 8, 2018 at 9:24:01 PM UTC+5:30, mukul singh wrote:
>
> Hi All,
>
> I am trying to develop a cpp client for a grpc server implemented using
> Java.
> With this change, I am trying to develop a cpp client for Apache Ratis
> project.
>
> The client is trying to implement read/write functions for a
> bi-directional append api, as in the Proto file below.
>
> Following are the series of events and results
>
> 1) std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto,
> RaftClientReplyProto>>
>
> cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1
> ));
>
>
> client created a new ClientAsyncReaderWriter and this invokes the stream
> observer constructor on java.
>
>
> AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
> LOG.info("new AppendRequestStreamObserver {}", name);
> this.responseObserver = ro;
>
>
>
> 2) the next call is for write, to write a RaftClientRequestProto to the
> stream.
>
> -- this call has no effect on the server. However the 'ok' flag from the
> completion queue return true.
>
>
> 3) the subsequent WritesDone done succeeds as well, and the log entries on
> server also confirm that the stream observer is closed.
>
>
> org.apache.ratis.grpc.client.RaftClientProtocolService: completed request
>
>
>
> Questions:
>
> 1) I would like to invoke the onNext() api on the Java Server.
>
> How can this be achieved from a cpp client.
>
>
> ======== Code Below ============
>
> Proto:
>
> syntax = "proto3";
> option java_package = "org.apache.ratis.shaded.proto.grpc";
> option java_outer_classname = "GRpcProtos";
> option java_generate_equals_and_hash = true;
> package ratis.grpc;
>
> import "Raft.proto";
>
> service RaftClientProtocolService {
> // A client-to-server RPC to set new raft configuration
> rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
> returns(ratis.common.RaftClientReplyProto) {}
>
> // A client-to-server stream RPC to append data
> rpc append(stream ratis.common.RaftClientRequestProto)
> returns (stream ratis.common.RaftClientReplyProto) {}
> }
>
>
> *Server code:*
>
> private class AppendRequestStreamObserver implements
> StreamObserver<RaftClientRequestProto> {
> private final String name = getId() + "-" + streamCount.getAndIncrement();
> private final StreamObserver<RaftClientReplyProto> responseObserver;
> private final SlidingWindow.Server<PendingAppend, RaftClientReply>
> slidingWindow
> = new SlidingWindow.Server<>(name, COMPLETED);
> private final AtomicBoolean isClosed;
>
> AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
> LOG.info("new AppendRequestStreamObserver {}", name);
> this.responseObserver = ro;
> this.isClosed = new AtomicBoolean(false);
> }
>
> void processClientRequestAsync(PendingAppend pending) {
> try {
> protocol.submitClientRequestAsync(pending.getRequest()
> ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
> pending.getSeqNum(), reply, this::sendReply,
> this::processClientRequestAsync)
> ).exceptionally(exception -> {
> // TODO: the exception may be from either raft or state machine.
> // Currently we skip all the following responses when getting an
> // exception from the state machine.
> responseError(exception, () -> "processClientRequestAsync for " +
> pending.getRequest());
> return null;
> });
> } catch (IOException e) {
> throw new CompletionException("Failed processClientRequestAsync for " +
> pending.getRequest(), e);
> }
> }
>
> @Override
> public void onNext(RaftClientRequestProto request) {
> try {
> final RaftClientRequest r =
> ClientProtoUtils.toRaftClientRequest(request);
> LOG.info("recieved request " + r.getCallId());
> final PendingAppend p = new PendingAppend(r);
> slidingWindow.receivedRequest(p, this::processClientRequestAsync);
> } catch (Throwable e) {
> responseError(e, () -> "onNext for " +
> ClientProtoUtils.toString(request));
> }
> }
>
> private void sendReply(PendingAppend ready) {
> Preconditions.assertTrue(ready.hasReply());
> if (ready == COMPLETED) {
> close();
> } else {
> LOG.info("{}: sendReply seq={}, {}", name, ready.getSeqNum(),
> ready.getReply());
> responseObserver.onNext(
> ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
> }
> }
>
> @Override
> public void onError(Throwable t) {
> // for now we just log a msg
> LOG.info(name + ": onError", t);
> slidingWindow.close();
> }
>
> @Override
> public void onCompleted() {
> LOG.info("completed request ");
> if (slidingWindow.endOfRequests()) {
> close();
> }
> }
>
>
> *Server logs:*
>
> 2018-03-08 15:44:06,688 INFO
> org.apache.ratis.grpc.client.RaftClientProtocolService: new
> AppendRequestStreamObserver 127.0.0.1_9858-3
>
> 2018-03-08 15:44:06,688 INFO
> org.apache.ratis.grpc.client.RaftClientProtocolService: completed request
>
>
>
> *client code:*
>
> std::shared_ptr< Channel > channel = grpc::CreateChannel(
> "localhost:9858",
>
> grpc::InsecureChannelCredentials());
>
> std::unique_ptr<RaftClientProtocolService::Stub> stub =
> RaftClientProtocolService::NewStub(channel);
>
> ContainerCommandRequestProto* read_requet = read_container(
> "container1");
>
> std::cout << ":cmd type is" << read_requet->cmdtype() <<
> std::endl;
>
>
> RaftClientRequestProto* req = create_request(read_requet, sizeof
> (ContainerCommandRequestProto));
>
> grpc::ClientContext ctx;
>
> grpc::CompletionQueue cq;
>
> std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto,
> RaftClientReplyProto>>
>
> cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1
> ));
>
> void* got_tag;
>
> bool ok = false;
>
> cq.Next(&got_tag, &ok);
>
> std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
>
>
> cli_stream->Write(*req, (void*)2);
>
> cq.Next(&got_tag, &ok);
>
> std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
>
> if (ok && got_tag == (void*)2) {
>
> std::cout << "sent correct request" <<std::endl;
>
> // check reply and status
>
> }
>
> cli_stream->WritesDone((void*)3);
>
> cq.Next(&got_tag, &ok);
>
> std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
>
> if (ok && got_tag == (void*)3) {
>
> std::cout << "sent correct request" <<std::endl;
>
> // check reply and status
>
> }
>
>
> *client logs:*
>
>
> cmd type is2
>
> tag0x1 ok1
>
> tag0x2 ok1
>
> sent correct request
>
> tag0x3 ok1
>
> sent correct request
>
> sending request for reply
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/645e97ea-c73e-4579-a103-cced0fa02f61%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.