This was because of difference in protobuf version. client was running 
3.0.0 while server was running 3.1.0.
Resolving the protobuf version resolved the issue.

On Thursday, March 8, 2018 at 9:24:01 PM UTC+5:30, mukul singh wrote:
>
> Hi All,
>
> I am trying to develop a cpp client for a grpc server implemented using 
> Java.
> With this change, I am trying to develop a cpp client for Apache Ratis 
> project.
>
> The client is trying to implement read/write functions for a 
> bi-directional append api, as in the Proto file below.
>
> Following are the series of events and results
>
> 1)          std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, 
> RaftClientReplyProto>>
>
>                         cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1
> ));
>
>
> client created a new ClientAsyncReaderWriter and this invokes the stream 
> observer constructor on java.
>
>
>   AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
>     LOG.info("new AppendRequestStreamObserver {}", name);
>     this.responseObserver = ro;
>
>
>
> 2) the next call is for write, to write a RaftClientRequestProto to the 
> stream.
>
> -- this call has no effect on the server. However the 'ok' flag from the 
> completion queue return true.
>
>
> 3) the subsequent WritesDone done succeeds as well, and the log entries on 
> server also confirm that the stream observer is closed.
>
>
> org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 
>
>
>
> Questions:
>
> 1) I would like to invoke the onNext() api on the Java Server.
>
> How can this be achieved from a cpp client.
>
>
> ======== Code Below ============
>
> Proto:
>
> syntax = "proto3";
> option java_package = "org.apache.ratis.shaded.proto.grpc";
> option java_outer_classname = "GRpcProtos";
> option java_generate_equals_and_hash = true;
> package ratis.grpc;
>
> import "Raft.proto";
>
> service RaftClientProtocolService {
>   // A client-to-server RPC to set new raft configuration
>   rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
>       returns(ratis.common.RaftClientReplyProto) {}
>
>   // A client-to-server stream RPC to append data
>   rpc append(stream ratis.common.RaftClientRequestProto)
>       returns (stream ratis.common.RaftClientReplyProto) {}
> }
>
>
> *Server code:*
>
> private class AppendRequestStreamObserver implements
>     StreamObserver<RaftClientRequestProto> {
>   private final String name = getId() + "-" +  streamCount.getAndIncrement();
>   private final StreamObserver<RaftClientReplyProto> responseObserver;
>   private final SlidingWindow.Server<PendingAppend, RaftClientReply> 
> slidingWindow
>       = new SlidingWindow.Server<>(name, COMPLETED);
>   private final AtomicBoolean isClosed;
>
>   AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
>     LOG.info("new AppendRequestStreamObserver {}", name);
>     this.responseObserver = ro;
>     this.isClosed = new AtomicBoolean(false);
>   }
>
>   void processClientRequestAsync(PendingAppend pending) {
>     try {
>       protocol.submitClientRequestAsync(pending.getRequest()
>       ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
>           pending.getSeqNum(), reply, this::sendReply, 
> this::processClientRequestAsync)
>       ).exceptionally(exception -> {
>         // TODO: the exception may be from either raft or state machine.
>         // Currently we skip all the following responses when getting an
>         // exception from the state machine.
>         responseError(exception, () -> "processClientRequestAsync for " + 
> pending.getRequest());
>         return null;
>       });
>     } catch (IOException e) {
>       throw new CompletionException("Failed processClientRequestAsync for " + 
> pending.getRequest(), e);
>     }
>   }
>
>   @Override
>   public void onNext(RaftClientRequestProto request) {
>     try {
>       final RaftClientRequest r = 
> ClientProtoUtils.toRaftClientRequest(request);
>       LOG.info("recieved request " + r.getCallId());
>       final PendingAppend p = new PendingAppend(r);
>       slidingWindow.receivedRequest(p, this::processClientRequestAsync);
>     } catch (Throwable e) {
>       responseError(e, () -> "onNext for " + 
> ClientProtoUtils.toString(request));
>     }
>   }
>
>   private void sendReply(PendingAppend ready) {
>       Preconditions.assertTrue(ready.hasReply());
>       if (ready == COMPLETED) {
>         close();
>       } else {
>         LOG.info("{}: sendReply seq={}, {}", name, ready.getSeqNum(), 
> ready.getReply());
>         responseObserver.onNext(
>             ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
>       }
>   }
>
>   @Override
>   public void onError(Throwable t) {
>     // for now we just log a msg
>     LOG.info(name + ": onError", t);
>     slidingWindow.close();
>   }
>
>   @Override
>   public void onCompleted() {
>     LOG.info("completed request ");
>     if (slidingWindow.endOfRequests()) {
>       close();
>     }
>   }
>
>
> *Server logs:*
>
> 2018-03-08 15:44:06,688 INFO 
> org.apache.ratis.grpc.client.RaftClientProtocolService: new 
> AppendRequestStreamObserver 127.0.0.1_9858-3
>
> 2018-03-08 15:44:06,688 INFO 
> org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 
>
>
>
> *client code:*
>
>         std::shared_ptr< Channel > channel = grpc::CreateChannel(
> "localhost:9858",
>
>                         grpc::InsecureChannelCredentials());
>
>         std::unique_ptr<RaftClientProtocolService::Stub> stub = 
> RaftClientProtocolService::NewStub(channel);
>
>         ContainerCommandRequestProto* read_requet = read_container(
> "container1");
>
>         std::cout << ":cmd type is" << read_requet->cmdtype() << 
> std::endl;
>
>
>         RaftClientRequestProto* req = create_request(read_requet, sizeof
> (ContainerCommandRequestProto));
>
>         grpc::ClientContext ctx;
>
>         grpc::CompletionQueue cq;
>
>         std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, 
> RaftClientReplyProto>>
>
>                         cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1
> ));
>
>         void* got_tag;
>
>         bool ok = false;
>
>         cq.Next(&got_tag, &ok);
>
>         std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;
>
>
>         cli_stream->Write(*req, (void*)2);
>
>         cq.Next(&got_tag, &ok);
>
>         std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;
>
>         if (ok && got_tag == (void*)2) {
>
>          std::cout << "sent correct request" <<std::endl;
>
>         // check reply and status
>
>         }
>
>         cli_stream->WritesDone((void*)3);
>
>         cq.Next(&got_tag, &ok);
>
>         std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;
>
>         if (ok && got_tag == (void*)3) {
>
>          std::cout << "sent correct request" <<std::endl;
>
>         // check reply and status
>
>         }
>
>
> *client logs:*
>
>
> cmd type is2
>
> tag0x1   ok1
>
> tag0x2   ok1
>
> sent correct request
>
> tag0x3   ok1
>
> sent correct request
>
> sending request for reply
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/645e97ea-c73e-4579-a103-cced0fa02f61%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to