Hi All,
I am trying to develop a cpp client for a grpc server implemented using
Java.
With this change, I am trying to develop a cpp client for Apache Ratis
project.
The client is trying to implement read/write functions for a bi-directional
append api, as in the Proto file below.
Following are the series of events and results
1) std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto,
RaftClientReplyProto>>
cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));
client created a new ClientAsyncReaderWriter and this invokes the stream
observer constructor on java.
AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
LOG.info("new AppendRequestStreamObserver {}", name);
this.responseObserver = ro;
2) the next call is for write, to write a RaftClientRequestProto to the stream.
-- this call has no effect on the server. However the 'ok' flag from the
completion queue return true.
3) the subsequent WritesDone done succeeds as well, and the log entries on
server also confirm that the stream observer is closed.
org.apache.ratis.grpc.client.RaftClientProtocolService: completed request
Questions:
1) I would like to invoke the onNext() api on the Java Server.
How can this be achieved from a cpp client.
======== Code Below ============
Proto:
syntax = "proto3";
option java_package = "org.apache.ratis.shaded.proto.grpc";
option java_outer_classname = "GRpcProtos";
option java_generate_equals_and_hash = true;
package ratis.grpc;
import "Raft.proto";
service RaftClientProtocolService {
// A client-to-server RPC to set new raft configuration
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
// A client-to-server stream RPC to append data
rpc append(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
}
*Server code:*
private class AppendRequestStreamObserver implements
StreamObserver<RaftClientRequestProto> {
private final String name = getId() + "-" + streamCount.getAndIncrement();
private final StreamObserver<RaftClientReplyProto> responseObserver;
private final SlidingWindow.Server<PendingAppend, RaftClientReply>
slidingWindow
= new SlidingWindow.Server<>(name, COMPLETED);
private final AtomicBoolean isClosed;
AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
LOG.info("new AppendRequestStreamObserver {}", name);
this.responseObserver = ro;
this.isClosed = new AtomicBoolean(false);
}
void processClientRequestAsync(PendingAppend pending) {
try {
protocol.submitClientRequestAsync(pending.getRequest()
).thenAcceptAsync(reply -> slidingWindow.receiveReply(
pending.getSeqNum(), reply, this::sendReply,
this::processClientRequestAsync)
).exceptionally(exception -> {
// TODO: the exception may be from either raft or state machine.
// Currently we skip all the following responses when getting an
// exception from the state machine.
responseError(exception, () -> "processClientRequestAsync for " +
pending.getRequest());
return null;
});
} catch (IOException e) {
throw new CompletionException("Failed processClientRequestAsync for " +
pending.getRequest(), e);
}
}
@Override
public void onNext(RaftClientRequestProto request) {
try {
final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
LOG.info("recieved request " + r.getCallId());
final PendingAppend p = new PendingAppend(r);
slidingWindow.receivedRequest(p, this::processClientRequestAsync);
} catch (Throwable e) {
responseError(e, () -> "onNext for " +
ClientProtoUtils.toString(request));
}
}
private void sendReply(PendingAppend ready) {
Preconditions.assertTrue(ready.hasReply());
if (ready == COMPLETED) {
close();
} else {
LOG.info("{}: sendReply seq={}, {}", name, ready.getSeqNum(),
ready.getReply());
responseObserver.onNext(
ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
}
}
@Override
public void onError(Throwable t) {
// for now we just log a msg
LOG.info(name + ": onError", t);
slidingWindow.close();
}
@Override
public void onCompleted() {
LOG.info("completed request ");
if (slidingWindow.endOfRequests()) {
close();
}
}
*Server logs:*
2018-03-08 15:44:06,688 INFO
org.apache.ratis.grpc.client.RaftClientProtocolService: new
AppendRequestStreamObserver 127.0.0.1_9858-3
2018-03-08 15:44:06,688 INFO
org.apache.ratis.grpc.client.RaftClientProtocolService: completed request
*client code:*
std::shared_ptr< Channel > channel = grpc::CreateChannel(
"localhost:9858",
grpc::InsecureChannelCredentials());
std::unique_ptr<RaftClientProtocolService::Stub> stub =
RaftClientProtocolService::NewStub(channel);
ContainerCommandRequestProto* read_requet = read_container(
"container1");
std::cout << ":cmd type is" << read_requet->cmdtype() << std::endl;
RaftClientRequestProto* req = create_request(read_requet, sizeof
(ContainerCommandRequestProto));
grpc::ClientContext ctx;
grpc::CompletionQueue cq;
std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto,
RaftClientReplyProto>>
cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
cli_stream->Write(*req, (void*)2);
cq.Next(&got_tag, &ok);
std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
if (ok && got_tag == (void*)2) {
std::cout << "sent correct request" <<std::endl;
// check reply and status
}
cli_stream->WritesDone((void*)3);
cq.Next(&got_tag, &ok);
std::cout << "tag" << got_tag << " ok"<< ok<<std::endl;
if (ok && got_tag == (void*)3) {
std::cout << "sent correct request" <<std::endl;
// check reply and status
}
*client logs:*
cmd type is2
tag0x1 ok1
tag0x2 ok1
sent correct request
tag0x3 ok1
sent correct request
sending request for reply
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/3944c7b7-03f2-429a-8b5c-f4e1a033761b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.