This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 10e1480081c fix: fix a race condition in BigtableService cache (#28122)
10e1480081c is described below
commit 10e1480081cff16c79faf54fff7bdae8d45d0341
Author: Mattie Fu <[email protected]>
AuthorDate: Tue Aug 29 10:34:08 2023 -0400
fix: fix a race condition in BigtableService cache (#28122)
* fix: fix a race condition in BigtableService cache
* address comments
---
.../io/gcp/bigtable/BigtableServiceFactory.java | 43 +++++++++++++---------
1 file changed, 25 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
index a9db4943c65..635856430fb 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
@@ -75,20 +75,19 @@ class BigtableServiceFactory implements Serializable {
@Override
public void close() {
- int refCount =
- refCounts.getOrDefault(getConfigId().id(), new
AtomicInteger(0)).decrementAndGet();
- if (refCount < 0) {
- LOG.error(
- "close() Ref count is < 0, configId=" + getConfigId().id() + "
refCount=" + refCount);
- }
- LOG.debug("close() for config id " + getConfigId().id() + ", ref count
is " + refCount);
- if (refCount == 0) {
- synchronized (lock) {
- if (refCounts.get(getConfigId().id()).get() <= 0) {
- entries.remove(getConfigId().id());
- refCounts.remove(getConfigId().id());
- getService().close();
- }
+ synchronized (lock) {
+ int refCount =
+ refCounts.getOrDefault(getConfigId().id(), new
AtomicInteger(0)).decrementAndGet();
+ if (refCount < 0) {
+ LOG.error(
+ "close() Ref count is < 0, configId=" + getConfigId().id() + "
refCount=" + refCount);
+ }
+ LOG.debug(
+ "close() is called for config id " + getConfigId().id() + ", ref
count is " + refCount);
+ if (refCount == 0) {
+ entries.remove(getConfigId().id());
+ refCounts.remove(getConfigId().id());
+ getService().close();
}
}
}
@@ -105,8 +104,12 @@ class BigtableServiceFactory implements Serializable {
BigtableServiceEntry entry = entries.get(configId.id());
if (entry != null) {
// When entry is not null, refCount.get(configId.id()) should always
exist.
- // Do a getOrDefault to avoid unexpected NPEs.
- refCounts.getOrDefault(configId.id(), new
AtomicInteger(0)).getAndIncrement();
+ // Doing a putIfAbsent to avoid NPE.
+ AtomicInteger count = refCounts.putIfAbsent(configId.id(), new
AtomicInteger(0));
+ if (count == null) {
+ LOG.error("entry is not null but refCount of config Id " +
configId.id() + " is null.");
+ }
+ refCounts.get(configId.id()).getAndIncrement();
LOG.debug("getServiceForReading() returning an existing service
entry");
return entry;
}
@@ -150,8 +153,12 @@ class BigtableServiceFactory implements Serializable {
LOG.debug("getServiceForWriting(), config id: " + configId.id());
if (entry != null) {
// When entry is not null, refCount.get(configId.id()) should always
exist.
- // Do a getOrDefault to avoid unexpected NPEs.
- refCounts.getOrDefault(configId.id(), new
AtomicInteger(0)).getAndIncrement();
+ // Doing a putIfAbsent to avoid NPE.
+ AtomicInteger count = refCounts.putIfAbsent(configId.id(), new
AtomicInteger(0));
+ if (count == null) {
+ LOG.error("entry is not null but refCount of config Id " +
configId.id() + " is null.");
+ }
+ refCounts.get(configId.id()).getAndIncrement();
LOG.debug("getServiceForWriting() returning an existing service
entry");
return entry;
}