damccorm commented on a change in pull request #17240:
URL: https://github.com/apache/beam/pull/17240#discussion_r840539459



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateGrpcClientCache.java
##########
@@ -59,74 +58,108 @@ public BeamFnStateGrpcClientCache(
     // This showed a 1-2% improvement in the ProcessBundleBenchmark#testState* 
benchmarks.
     this.channelFactory = channelFactory.withDirectExecutor();
     this.outboundObserverFactory = outboundObserverFactory;
-    this.cache = new ConcurrentHashMap<>();
+    this.cache = new HashMap<>();
   }
 
   /**
    * Creates or returns an existing {@link BeamFnStateClient} depending on 
whether the passed in
    * {@link ApiServiceDescriptor} currently has a {@link BeamFnStateClient} 
bound to the same
    * channel.
    */
-  public BeamFnStateClient forApiServiceDescriptor(ApiServiceDescriptor 
apiServiceDescriptor)
-      throws IOException {
-    return cache.computeIfAbsent(apiServiceDescriptor, 
this::createBeamFnStateClient);
-  }
-
-  private BeamFnStateClient createBeamFnStateClient(ApiServiceDescriptor 
apiServiceDescriptor) {
-    return new GrpcStateClient(apiServiceDescriptor);
+  public synchronized BeamFnStateClient forApiServiceDescriptor(
+      ApiServiceDescriptor apiServiceDescriptor) throws IOException {
+    // We specifically are synchronized so that we only create one 
GrpcStateClient at a time
+    // preventing a race where multiple GrpcStateClient objects might be 
constructed at the same
+    // for the same ApiServiceDescriptor.
+    BeamFnStateClient rval;
+    synchronized (cache) {
+      rval = cache.get(apiServiceDescriptor);
+    }
+    if (rval == null) {
+      // We can't be synchronized on cache while constructing the 
GrpcStateClient since if the
+      // connection fails, onError may be invoked from the gRPC thread which 
will invoke
+      // closeAndCleanUp that clears the cache.
+      rval = new GrpcStateClient(apiServiceDescriptor);
+      synchronized (cache) {
+        cache.put(apiServiceDescriptor, rval);

Review comment:
       I'm not super familiar with the context here, but does this open us up 
to another race condition? Specifically, could you run into a case where:
   
   1. The cache has no client
   2. Thread 1 checks if the cache has a client (it doesn't)
   3. Thread 2 checks if the cache has a client (it doesn't)
   4. Thread 1 creates a new client and puts it in the cache.
   5. Thread 2 creates a new client and puts it in the cache (overwriting what 
thread 1 put there)
   6. Threads 1 and 2 have different clients
   
   I think if this is a problem we could get around it by doing an extra check 
in the synchronized block to see if the cache contains a client (and if it 
does, we can just dispose of the one we just created).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to