scwhittle commented on a change in pull request #16745:
URL: https://github.com/apache/beam/pull/16745#discussion_r800402411



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -87,9 +88,12 @@ private GrpcStateClient(ApiServiceDescriptor 
apiServiceDescriptor) {
       this.apiServiceDescriptor = apiServiceDescriptor;
       this.outstandingRequests = new ConcurrentHashMap<>();
       this.channel = channelFactory.apply(apiServiceDescriptor);
+      // We use the directExecutor because we just complete futures when 
handling responses.
+      // This showed a 1-2% improvement in the 
ProcessBundleBenchmark#testState* benchmarks.
       this.outboundObserver =
           outboundObserverFactory.outboundObserverFor(
-              BeamFnStateGrpc.newStub(channel)::state, new InboundObserver());
+              
BeamFnStateGrpc.newStub(channel).withExecutor(MoreExecutors.directExecutor())::state,
+              new InboundObserver());

Review comment:
       Can you add a comment in InboundObserver.next that it should not block 
as it is run on a direct executor?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -87,9 +88,12 @@ private GrpcStateClient(ApiServiceDescriptor 
apiServiceDescriptor) {
       this.apiServiceDescriptor = apiServiceDescriptor;
       this.outstandingRequests = new ConcurrentHashMap<>();
       this.channel = channelFactory.apply(apiServiceDescriptor);
+      // We use the directExecutor because we just complete futures when 
handling responses.
+      // This showed a 1-2% improvement in the 
ProcessBundleBenchmark#testState* benchmarks.
       this.outboundObserver =
           outboundObserverFactory.outboundObserverFor(
-              BeamFnStateGrpc.newStub(channel)::state, new InboundObserver());
+              
BeamFnStateGrpc.newStub(channel).withExecutor(MoreExecutors.directExecutor())::state,

Review comment:
       There is also the option to create a grpc channel that executes directly 
instead of an executor which is says can give transport ability to optimize 
further:
   
https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannelBuilder.html#directExecutor--
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to