Sure.
Here is the code without multithreading:
public void greetServerStream(GreetServerStreamRequest
request, StreamObserver<GreetServerStreamResponse> responseObserver) {
String message = request.getGreeting().getMessage();
try {
for (int i = 0; i < 10; i++) {
String result = "Hello " + message + ", response number: " + i;
GreetServerStreamResponse response = GreetServerStreamResponse.
newBuilder()
.setResult(result)
.build();
*responseObserver.onNext(response);*
Thread.sleep(1000L);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
responseObserver.onCompleted();
}
}
>From the code above, I implemented a multithreaded version as shown below:
public void greetServerStream(GreetServerStreamRequest
request, StreamObserver<GreetServerStreamResponse> responseObserver) {
String message = request.getGreeting().getMessage();
MultiRunnable runnable1 = new MultiRunnable("thread1", message+"1",
responseObserver);
runnable1.start();
MultiRunnable runnable2 = new MultiRunnable("thread1", message+"2",
responseObserver);
runnable2.start();
}
public class MultiRunnable implements Runnable {
private Thread thread;
private final String threadName;
private final String message;
private final ServerCallStreamObserver<GreetServerStreamResponse>
serverCallStreamObserver;
public MultiRunnable(String threadName, String message,
StreamObserver<GreetServerStreamResponse> responseObserver) {
this.threadName = threadName;
this.message = message;
this.serverCallStreamObserver =
(ServerCallStreamObserver<GreetServerStreamResponse>)responseObserver;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String result = "Hello " + message + ", response number: "
+ i;
GreetServerStreamResponse response =
GreetServerStreamResponse.newBuilder()
.setResult(result)
.build();
*synchronized (serverCallStreamObserver) {
serverCallStreamObserver.onNext(response);*
* }*
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
} finally {
serverCallStreamObserver.onCompleted();
}
}
public void start () {
System.out.println("Starting " + threadName );
if (thread == null) {
thread = new Thread (this, threadName);
thread.start ();
}
}
}
So each thread will run its own for loop and call onNext() to send response
stream back to client. I initially did not add the synchronized block above
and got the following error:
"Stream 3 sent too many headers EOS"
Upon adding the block, I was able to make multiple threads executing
onNext() concurrently.
I am just curious about whether this is the right way of doing
synchronization. From the best practice perspective, what is the best way
of doing synchronization? Is multithreading a common thing to do or
recommended when calling onNext()?
Thanks,
Bill
On Wednesday, 19 May 2021 at 01:47:54 UTC-4 [email protected] wrote:
> Pls include a code snippet of what you want to do. Show how you intend to
> share "one ResponseObserver".
>
> On Tue, May 18, 2021 at 6:56 PM Bill Li <[email protected]> wrote:
>
>> Got it, thanks!
>>
>> I am currently implementing a server-side streaming application. Can one
>> ResponseObserver be shared by multiple threads sending response stream back
>> to the client through onNext() method? Just want to confirm if there is a
>> race condition in calling onNext() at the same time.
>>
>> On Tuesday, 18 May 2021 at 19:28:43 UTC-4 [email protected] wrote:
>>
>>> With NettyServerBuilder you can use maxConcurrentCallsPerConnection(int
>>> maxCalls)
>>> <https://github.com/grpc/grpc-java/blob/master/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java#L397>
>>>
>>>
>>> This is the same as setting MAX_CONCURRENT_STREAMS per connection.
>>>
>>> On Tue, May 18, 2021 at 3:36 PM Bill Li <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> Does anyone know or have an example for configuring the parameter
>>>> MAX_CONCURRENT_STREAMS for gRPC server written in Java?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "grpc.io" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To view this discussion on the web visit
>>>> https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com
>>>>
>>>> <https://groups.google.com/d/msgid/grpc-io/cbb2fd35-a01a-4128-879d-08cbc91049b0n%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>> --
>> You received this message because you are subscribed to the Google Groups
>> "grpc.io" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>>
> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com
>>
>> <https://groups.google.com/d/msgid/grpc-io/0eb808f0-b1e0-4b5f-86e6-ffa15b7149d8n%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/065aecd9-0190-4bde-8a91-aae0edc2a0e5n%40googlegroups.com.