Apologies for these silly mistakes, I actually pasted the same code with just little modification. It is working now. I will keep your points in mind.
Thanks On Thu, 29 Apr 2021, 13:44 Piotr Morgwai Kotarbinski, <[email protected]> wrote: > it's very hard to figure out what you are trying to achieve and how you > expect this code to behave: you should narrow the problem and post the > minimal example that causes it. Moreover, the code is in some intermediate > inconsistent state and wouldn't even compile (proto has functions > *GetRequest* and *SendDataToServer*, but then in your code you call > something named *sendRequest *; *GrpcClient* has something that looks > like constructor but is named *GrpcClient2* ; etc), which makes it even > harder to read. > If you ask random ppl on the internet for help, you should make it easy > for them to help you ;-) > > Cheers! > > On Wednesday, April 28, 2021 at 1:16:23 PM UTC+7 [email protected] wrote: > >> Hi All >> >> In my application I have a client which sends the streams of record and >> server streams the acknowledgment, things are working fine for first >> request however for the subsequent request I am getting below errors. The >> subsequent request is made after 2 or 3 seconds(GetRequest in proto file >> below) >> >> On Server side I am getting >> Cancelling the stream with status Status{code=INTERNAL, description=Too >> many responses, cause=null} >> >> On client Side I am getting >> >> CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL >> >> >> Please find the code: >> *proto file* >> syntax = "proto3"; >> option java_multiple_files = true; >> option java_package = "io.grpc.examples.tps"; >> option java_outer_classname = "TpsProto"; >> option objc_class_prefix = "tps"; >> >> package tps; >> // Interface exported by the server. >> service TPS { >> rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from >> client and stream data to different grpc server >> rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {} >> } >> >> message Request { >> string policyId = 1; >> string txnId = 2; >> string clientId = 3; >> } >> >> message Acknowledgement { >> string txnId = 1; >> string clientId = 2; >> } >> >> >> Client Side code: >> >> public final class GrpcClient { >> private final Semaphore limiter = new Semaphore(1000); >> private final List<ManagedChannel> channels; >> private final List<TPSGrpc.TPSStub> futureStubList; >> StreamObserver<Request> str; >> public GrpcClient2(String host, int port) { >> channels = new ArrayList<>(); >> futureStubList = new ArrayList<>(); >> ManagedChannel channel =null; >> channel = NettyChannelBuilder.forAddress(host, port) >> >> .usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS) >> .build(); >> >> channels.add(channel); >> futureStubList.add(TPSGrpc.newStub(channel)); >> str = futureStubList.get(0).sendRequest(new >> StreamObserver<Acknowledgement>() { >> >> @Override >> public void onNext(Acknowledgement value) { >> // TODO Auto-generated method stub >> System.out.println(value.getTxnId()); >> >> } >> @Override >> public void onError(Throwable t) { >> // TODO Auto-generated method stub >> System.out.println(t.getMessage()); >> >> } >> @Override >> public void onCompleted() { >> // TODO Auto-generated method stub >> System.out.println("comp"); >> >> } >> }); >> } >> public void shutdown() throws InterruptedException { >> } >> public void verifyAsync(Request request) throws InterruptedException { >> limiter.acquire(); >> str.onNext(request); >> } >> } >> >> >> Server Side >> >> >> public class Service extends TPSGrpc.TPSImplBase{ >> >> >> static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>(); >> @Override >> public void getRequest(Request request, StreamObserver<Acknowledgement> >> responseObserver) { >> Request clone = Request.newBuilder().setClientId(request.getClientId()) >> .setTxnId(request.getTxnId()).build(); >> >> responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build()); >> responseObserver.onCompleted(); >> CompletableFuture.runAsync(new Runnable() { >> @Override >> public void run() { >> for(int i=0;i<GrpcServer.node.size();i++) { >> callNode(GrpcServer.node.get(i), clone); >> } >> } >> }); >> } >> @Override >> public StreamObserver<Request> >> sendRequest(StreamObserver<Acknowledgement> responseObserver) { >> >> return new StreamObserver<Request>() { >> >> @Override >> public void onNext(Request value) { >> Acknowledgement ack = >> Acknowledgement.newBuilder().setClientId(value.getClientId()). >> setTxnId(value.getTxnId()).build(); >> responseObserver.onNext(ack); >> System.out.println(value.getTxnId()); >> } >> >> @Override >> public void onError(Throwable t) { >> } >> @Override >> public void onCompleted() { >> responseObserver.onCompleted(); >> System.out.println("server comp"); >> } >> }; >> } >> private void callNode(String node, Request request) { >> GrpcClient client = urlVsGrpcClient.get(node); >> if (client == null) { >> String[] split = node.split(":"); >> client = new GrpcClient(split[0], Integer.parseInt(split[1])); >> urlVsGrpcClient.put(node, client); >> } >> try { >> client.verifyAsync(request); >> } catch (InterruptedException e) { >> e.printStackTrace(); >> } >> } >> } >> >> >> Please check and suggest. >> >> Thanks, >> Shobhit Srivastava >> > -- > You received this message because you are subscribed to a topic in the > Google Groups "grpc.io" group. > To unsubscribe from this topic, visit > https://groups.google.com/d/topic/grpc-io/72KdKTWDPx8/unsubscribe. > To unsubscribe from this group and all its topics, send an email to > [email protected]. > To view this discussion on the web visit > https://groups.google.com/d/msgid/grpc-io/370b60aa-3e0b-471d-9c82-87a2c6935d48n%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/370b60aa-3e0b-471d-9c82-87a2c6935d48n%40googlegroups.com?utm_medium=email&utm_source=footer> > . > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAHM18J1yYg4hXMvt_J3C4vwNGA_p86%3DbErNhYuwLqQ3Miu5J6w%40mail.gmail.com.
