This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10e1480081c fix: fix a race condition in BigtableService cache (#28122)
10e1480081c is described below

commit 10e1480081cff16c79faf54fff7bdae8d45d0341
Author: Mattie Fu <[email protected]>
AuthorDate: Tue Aug 29 10:34:08 2023 -0400

    fix: fix a race condition in BigtableService cache (#28122)
    
    * fix: fix a race condition in BigtableService cache
    
    * address comments
---
 .../io/gcp/bigtable/BigtableServiceFactory.java    | 43 +++++++++++++---------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
index a9db4943c65..635856430fb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
@@ -75,20 +75,19 @@ class BigtableServiceFactory implements Serializable {
 
     @Override
     public void close() {
-      int refCount =
-          refCounts.getOrDefault(getConfigId().id(), new 
AtomicInteger(0)).decrementAndGet();
-      if (refCount < 0) {
-        LOG.error(
-            "close() Ref count is < 0, configId=" + getConfigId().id() + " 
refCount=" + refCount);
-      }
-      LOG.debug("close() for config id " + getConfigId().id() + ", ref count 
is " + refCount);
-      if (refCount == 0) {
-        synchronized (lock) {
-          if (refCounts.get(getConfigId().id()).get() <= 0) {
-            entries.remove(getConfigId().id());
-            refCounts.remove(getConfigId().id());
-            getService().close();
-          }
+      synchronized (lock) {
+        int refCount =
+            refCounts.getOrDefault(getConfigId().id(), new 
AtomicInteger(0)).decrementAndGet();
+        if (refCount < 0) {
+          LOG.error(
+              "close() Ref count is < 0, configId=" + getConfigId().id() + " 
refCount=" + refCount);
+        }
+        LOG.debug(
+            "close() is called for config id " + getConfigId().id() + ", ref 
count is " + refCount);
+        if (refCount == 0) {
+          entries.remove(getConfigId().id());
+          refCounts.remove(getConfigId().id());
+          getService().close();
         }
       }
     }
@@ -105,8 +104,12 @@ class BigtableServiceFactory implements Serializable {
       BigtableServiceEntry entry = entries.get(configId.id());
       if (entry != null) {
         // When entry is not null, refCount.get(configId.id()) should always 
exist.
-        // Do a getOrDefault to avoid unexpected NPEs.
-        refCounts.getOrDefault(configId.id(), new 
AtomicInteger(0)).getAndIncrement();
+        // Doing a putIfAbsent to avoid NPE.
+        AtomicInteger count = refCounts.putIfAbsent(configId.id(), new 
AtomicInteger(0));
+        if (count == null) {
+          LOG.error("entry is not null but refCount of config Id " + 
configId.id() + " is null.");
+        }
+        refCounts.get(configId.id()).getAndIncrement();
         LOG.debug("getServiceForReading() returning an existing service 
entry");
         return entry;
       }
@@ -150,8 +153,12 @@ class BigtableServiceFactory implements Serializable {
       LOG.debug("getServiceForWriting(), config id: " + configId.id());
       if (entry != null) {
         // When entry is not null, refCount.get(configId.id()) should always 
exist.
-        // Do a getOrDefault to avoid unexpected NPEs.
-        refCounts.getOrDefault(configId.id(), new 
AtomicInteger(0)).getAndIncrement();
+        // Doing a putIfAbsent to avoid NPE.
+        AtomicInteger count = refCounts.putIfAbsent(configId.id(), new 
AtomicInteger(0));
+        if (count == null) {
+          LOG.error("entry is not null but refCount of config Id " + 
configId.id() + " is null.");
+        }
+        refCounts.get(configId.id()).getAndIncrement();
         LOG.debug("getServiceForWriting() returning an existing service 
entry");
         return entry;
       }

Reply via email to