What's what the purpose of System.in.read() before stopping the server? The HelloWorld started example uses something like the following:
public void start() throws IOException { server = ServerBuilder.forPort(port) .addService(new GreeterImpl()) .build() .start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); HelloWorldServer.this.stop(); System.err.println("*** server shut down"); } }); } public void stop() { if (server != null) { server.shutdown(); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } /** * Main launches the server from the command line. */ public static void main(String[] args) throws IOException, InterruptedException { final HelloWorldServer server = new HelloWorldServer(); server.start(); server.blockUntilShutdown(); } On Tuesday, September 20, 2016 at 8:01:01 AM UTC-6, Avinash Dongre wrote: > > I am getting following exception on Server > > Server started, listening on 50051 > Sep 20, 2016 7:24:07 PM io.grpc.netty.NettyServerHandler onStreamError > WARNING: Stream Error > io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed > before write could take place > at > io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147) > at > io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:487) > at > io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:468) > at > io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:103) > at > io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:343) > at > io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1151) > at > io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:1099) > at > io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:521) > at > io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:527) > at > io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:522) > at > io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:396) > at > io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80) > at > io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:490) > at > io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253) > at > io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:155) > at > io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) > at > io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:113) > at > io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:333) > at > io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:393) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:745) > > Following is my Server Code > > > package io.test; > > import com.google.protobuf.ByteString; > import io.test.generated.GreeterGrpc; > import io.test.generated.GreeterOuterClass; > import io.grpc.Server; > import io.grpc.ServerBuilder; > import io.grpc.internal.ServerImpl; > import io.grpc.netty.NettyServerBuilder; > import io.grpc.stub.ServerCallStreamObserver; > import io.grpc.stub.StreamObserver; > > import java.util.ArrayList; > import java.util.List; > > /** > * Hello world! > */ > public class GrpcServer { > > /* The port on which the server should run */ > private int port = 50051; > private Server server; > > private void start() throws Exception { > server = ServerBuilder.forPort(port) > .addService(new GreeterImpl()) > .build().start(); > System.out.println("Server started, listening on " + port); > } > > private void stop() { > if (server != null) { > server.shutdown(); > } > } > > public static void main(String[] args) throws Exception { > final GrpcServer server = new GrpcServer(); > server.start(); > System.in.read(); > server.stop(); > } > > private class GreeterImpl extends GreeterGrpc.GreeterImplBase { > > @Override > public void sayHello(GreeterOuterClass.HelloRequest req, > StreamObserver<GreeterOuterClass.HelloReply> > responseObserver) { > > GreeterOuterClass.HelloReply.Builder builder = > GreeterOuterClass.HelloReply.newBuilder(); > final ServerCallStreamObserver<GreeterOuterClass.HelloReply> scso = > > (io.grpc.stub.ServerCallStreamObserver<GreeterOuterClass.HelloReply>) > responseObserver; > > final long BATCH_SIZE = 100; > > Runnable drain = new Runnable() { > long remaining = 20_000_000L; > > @Override > public void run() { > if (remaining == 0L) return; > for (; remaining > 0L && scso.isReady(); remaining--) { > builder.addMessage(ByteString.copyFrom(new byte[32 * 1024])); > scso.onNext(builder.build()); > } > if (remaining == 0) { > scso.onCompleted(); > } > } > }; > scso.setOnReadyHandler(drain); > drain.run(); > } > } > } > > > and Following is my Client code. > > > package io.test; > > import com.google.common.util.concurrent.UncaughtExceptionHandlers; > import io.test.generated.GreeterGrpc; > import io.test.generated.GreeterOuterClass; > import io.grpc.ManagedChannel; > import io.grpc.ManagedChannelBuilder; > import io.grpc.stub.StreamObserver; > > import java.util.ArrayList; > import java.util.List; > import java.util.concurrent.*; > > /** > * Created by adongre on 9/20/16. > */ > public class GrpcClient { > > public static void main(String[] args) throws InterruptedException { > ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", > 50051) > .usePlaintext(true) > .executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(), > pool -> { > ForkJoinWorkerThread thread = > > ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); > thread.setDaemon(true); > > return thread; > }, UncaughtExceptionHandlers.systemExit(), true /* async */)) > .build(); > GreeterGrpc.GreeterStub asyncStub = GreeterGrpc.newStub(channel); > > final CountDownLatch finishLatch = new CountDownLatch(1); > final List<Integer> rowsReceived = new ArrayList<>(); > final long timeStart = System.nanoTime(); > StreamObserver<GreeterOuterClass.HelloReply> observer = new > StreamObserver<GreeterOuterClass.HelloReply>() { > @Override > public void onNext(GreeterOuterClass.HelloReply helloReply) { > rowsReceived.add(helloReply.getMessageList().size()); > } > > @Override > public void onError(Throwable throwable) { > > } > > @Override > public void onCompleted() { > int rowsReceivedNum = rowsReceived.stream().mapToInt(i -> > i.intValue()).sum(); > System.out.println("GrpcClient.onCompleted.Num Of Rows -> " + > rowsReceivedNum + " In " > + (System.nanoTime() - timeStart)); > finishLatch.countDown(); > } > }; > asyncStub.sayHello(GreeterOuterClass.HelloRequest.newBuilder().build(), > observer); > try { > finishLatch.await(30, TimeUnit.MINUTES); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > } > > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To post to this group, send email to grpc-io@googlegroups.com. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/5808eacd-7234-4810-8eaf-9647ab418e3c%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.