Hi All,

I am trying to develop a cpp client for a grpc server implemented using 
Java.
With this change, I am trying to develop a cpp client for Apache Ratis 
project.

The client is trying to implement read/write functions for a bi-directional 
append api, as in the Proto file below.

Following are the series of events and results

1)          std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, 
RaftClientReplyProto>>

                        cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));


client created a new ClientAsyncReaderWriter and this invokes the stream 
observer constructor on java.


  AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
    LOG.info("new AppendRequestStreamObserver {}", name);
    this.responseObserver = ro;



2) the next call is for write, to write a RaftClientRequestProto to the stream.

-- this call has no effect on the server. However the 'ok' flag from the 
completion queue return true.


3) the subsequent WritesDone done succeeds as well, and the log entries on 
server also confirm that the stream observer is closed.


org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 



Questions:

1) I would like to invoke the onNext() api on the Java Server.

How can this be achieved from a cpp client.


======== Code Below ============

Proto:

syntax = "proto3";
option java_package = "org.apache.ratis.shaded.proto.grpc";
option java_outer_classname = "GRpcProtos";
option java_generate_equals_and_hash = true;
package ratis.grpc;

import "Raft.proto";

service RaftClientProtocolService {
  // A client-to-server RPC to set new raft configuration
  rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
      returns(ratis.common.RaftClientReplyProto) {}

  // A client-to-server stream RPC to append data
  rpc append(stream ratis.common.RaftClientRequestProto)
      returns (stream ratis.common.RaftClientReplyProto) {}
}


*Server code:*

private class AppendRequestStreamObserver implements
    StreamObserver<RaftClientRequestProto> {
  private final String name = getId() + "-" +  streamCount.getAndIncrement();
  private final StreamObserver<RaftClientReplyProto> responseObserver;
  private final SlidingWindow.Server<PendingAppend, RaftClientReply> 
slidingWindow
      = new SlidingWindow.Server<>(name, COMPLETED);
  private final AtomicBoolean isClosed;

  AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
    LOG.info("new AppendRequestStreamObserver {}", name);
    this.responseObserver = ro;
    this.isClosed = new AtomicBoolean(false);
  }

  void processClientRequestAsync(PendingAppend pending) {
    try {
      protocol.submitClientRequestAsync(pending.getRequest()
      ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
          pending.getSeqNum(), reply, this::sendReply, 
this::processClientRequestAsync)
      ).exceptionally(exception -> {
        // TODO: the exception may be from either raft or state machine.
        // Currently we skip all the following responses when getting an
        // exception from the state machine.
        responseError(exception, () -> "processClientRequestAsync for " + 
pending.getRequest());
        return null;
      });
    } catch (IOException e) {
      throw new CompletionException("Failed processClientRequestAsync for " + 
pending.getRequest(), e);
    }
  }

  @Override
  public void onNext(RaftClientRequestProto request) {
    try {
      final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
      LOG.info("recieved request " + r.getCallId());
      final PendingAppend p = new PendingAppend(r);
      slidingWindow.receivedRequest(p, this::processClientRequestAsync);
    } catch (Throwable e) {
      responseError(e, () -> "onNext for " + 
ClientProtoUtils.toString(request));
    }
  }

  private void sendReply(PendingAppend ready) {
      Preconditions.assertTrue(ready.hasReply());
      if (ready == COMPLETED) {
        close();
      } else {
        LOG.info("{}: sendReply seq={}, {}", name, ready.getSeqNum(), 
ready.getReply());
        responseObserver.onNext(
            ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
      }
  }

  @Override
  public void onError(Throwable t) {
    // for now we just log a msg
    LOG.info(name + ": onError", t);
    slidingWindow.close();
  }

  @Override
  public void onCompleted() {
    LOG.info("completed request ");
    if (slidingWindow.endOfRequests()) {
      close();
    }
  }


*Server logs:*

2018-03-08 15:44:06,688 INFO 
org.apache.ratis.grpc.client.RaftClientProtocolService: new 
AppendRequestStreamObserver 127.0.0.1_9858-3

2018-03-08 15:44:06,688 INFO 
org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 



*client code:*

        std::shared_ptr< Channel > channel = grpc::CreateChannel(
"localhost:9858",

                        grpc::InsecureChannelCredentials());

        std::unique_ptr<RaftClientProtocolService::Stub> stub = 
RaftClientProtocolService::NewStub(channel);

        ContainerCommandRequestProto* read_requet = read_container(
"container1");

        std::cout << ":cmd type is" << read_requet->cmdtype() << std::endl;


        RaftClientRequestProto* req = create_request(read_requet, sizeof
(ContainerCommandRequestProto));

        grpc::ClientContext ctx;

        grpc::CompletionQueue cq;

        std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, 
RaftClientReplyProto>>

                        cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));

        void* got_tag;

        bool ok = false;

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;


        cli_stream->Write(*req, (void*)2);

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;

        if (ok && got_tag == (void*)2) {

         std::cout << "sent correct request" <<std::endl;

        // check reply and status

        }

        cli_stream->WritesDone((void*)3);

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;

        if (ok && got_tag == (void*)3) {

         std::cout << "sent correct request" <<std::endl;

        // check reply and status

        }


*client logs:*


cmd type is2

tag0x1   ok1

tag0x2   ok1

sent correct request

tag0x3   ok1

sent correct request

sending request for reply

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/3944c7b7-03f2-429a-8b5c-f4e1a033761b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to