jojochuang commented on code in PR #7745:
URL: https://github.com/apache/ozone/pull/7745#discussion_r2070987696
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads
would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
+ lockCnt -= 1;
+ if (lockCnt <= 0) {
+ LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+ lockCnt = 0;
+ }
+
+ if (lockCnt == 0) {
+ snapshotRefThreadIds.clear();
+ }
+ return () -> {
+ readLock.lock();
+ try {
+ if (lockCnt == 0) {
+ lockReleased.signalAll();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ };
+ }
+
+ /**
+ * Releases a lock on the cache.
+ */
+ public void releaseLock() {
+ writeLock.lock();
+ Runnable callback = decrementLockCount();
+ try {
+ decrementLockCount();
+ } finally {
+ writeLock.unlock();
+ }
+ callback.run();
+ }
+
+ /**
+ * Acquires lock on the cache within max amount time.
+ * @param timeout Max time to wait to acquire lock.
+ * @return true if lock is acquired otherwise false.
+ * @throws InterruptedException
+ */
+ public boolean tryAcquire(long timeout) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ if (timeout <= 0) {
+ endTime = Long.MAX_VALUE;
+ timeout = Long.MAX_VALUE;
+ }
+ if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ Runnable rollbackCallback = null;
+ try {
+ lockCnt += 1;
+ if (lockCnt == 1) {
+ snapshotRefThreadIds.clear();
+ dbMap.values().stream()
+ .flatMap(referenceCounted ->
+
referenceCounted.getThreadCntMap().entrySet().stream().map(entry ->
Pair.of(entry, referenceCounted)))
+ .forEach(entry -> updateThreadCnt(entry.getKey().getKey(),
entry.getValue().get().getSnapshotID(),
+ entry.getKey().getValue()));
Review Comment:
can you put this in a helper method to make it self explanatory, like
initializeThreadSnapshotReferences()
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads
would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
+ lockCnt -= 1;
+ if (lockCnt <= 0) {
+ LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+ lockCnt = 0;
+ }
+
+ if (lockCnt == 0) {
+ snapshotRefThreadIds.clear();
+ }
+ return () -> {
+ readLock.lock();
+ try {
+ if (lockCnt == 0) {
+ lockReleased.signalAll();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ };
+ }
+
+ /**
+ * Releases a lock on the cache.
+ */
+ public void releaseLock() {
+ writeLock.lock();
+ Runnable callback = decrementLockCount();
+ try {
+ decrementLockCount();
+ } finally {
+ writeLock.unlock();
+ }
+ callback.run();
+ }
+
+ /**
+ * Acquires lock on the cache within max amount time.
+ * @param timeout Max time to wait to acquire lock.
+ * @return true if lock is acquired otherwise false.
+ * @throws InterruptedException
+ */
+ public boolean tryAcquire(long timeout) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ if (timeout <= 0) {
+ endTime = Long.MAX_VALUE;
+ timeout = Long.MAX_VALUE;
+ }
+ if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ Runnable rollbackCallback = null;
+ try {
+ lockCnt += 1;
+ if (lockCnt == 1) {
+ snapshotRefThreadIds.clear();
+ dbMap.values().stream()
+ .flatMap(referenceCounted ->
+
referenceCounted.getThreadCntMap().entrySet().stream().map(entry ->
Pair.of(entry, referenceCounted)))
+ .forEach(entry -> updateThreadCnt(entry.getKey().getKey(),
entry.getValue().get().getSnapshotID(),
+ entry.getKey().getValue()));
+ }
+ while (!snapshotRefThreadIds.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= endTime) {
+ // If and release acquired lock
+ rollbackCallback = decrementLockCount();
+ break;
+ }
+ dbClosed.await(Math.min(endTime - currentTime, lockTimeout),
TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (rollbackCallback != null) {
+ rollbackCallback.run();
+ return false;
+ }
+ invalidateAll();
+ return true;
+ }
+ return false;
+ }
+
+ private void updateThreadCnt(long threadId, UUID key, long cnt) {
+ snapshotRefThreadIds.compute(threadId, (tid, countMap) -> {
+ if (countMap == null) {
+ if (cnt <= 0) {
+ return null;
+ }
+ countMap = new ConcurrentHashMap<>();
+ }
+ countMap.compute(key, (snapId, count) -> {
+ if (count == null) {
+ count = 0L;
+ }
+ count += cnt;
+ return count > 0 ? count : null;
+ });
+ return countMap.isEmpty() ? null : countMap;
+ });
+ }
+
+ /**
+ * Waits for lock to be released. This function doesn't wait for the lock if
the thread already has a few snapshots
+ * open. It only waits if the thread is reading it's first snapshot.
+ * @param threadId
+ * @throws InterruptedException
+ */
+ private void waitForLock(long threadId) throws IOException {
Review Comment:
If the Javadoc is correct, the method needs a more descriptive name.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -160,4 +160,8 @@ public void close() {
// so it is eligible to be used with try-with-resources.
decrementRefCount();
}
+
+ public Map<Long, Long> getThreadCntMap() {
+ return ImmutableMap.copyOf(threadMap);
Review Comment:
will this be expensive?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -57,14 +62,32 @@ public class SnapshotCache implements
ReferenceCountedCallback, AutoCloseable {
"SnapshotCacheCleanupService";
private final OMMetrics omMetrics;
+ private final ReadWriteLock lock;
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int lockCnt;
+ private final Condition lockReleased;
+ private final Condition dbClosed;
+ private final long lockTimeout;
+ private final Map<Long, Map<UUID, Long>> snapshotRefThreadIds;
+ private final AtomicBoolean closed;
public SnapshotCache(CacheLoader<UUID, OmSnapshot> cacheLoader, int
cacheSizeLimit, OMMetrics omMetrics,
- long cleanupInterval) {
+ long cleanupInterval, long lockTimeout) {
Review Comment:
lockTimeout is in milliseconds
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads
would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
+ lockCnt -= 1;
+ if (lockCnt <= 0) {
+ LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+ lockCnt = 0;
+ }
+
+ if (lockCnt == 0) {
+ snapshotRefThreadIds.clear();
+ }
+ return () -> {
+ readLock.lock();
+ try {
+ if (lockCnt == 0) {
+ lockReleased.signalAll();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ };
+ }
+
+ /**
+ * Releases a lock on the cache.
+ */
+ public void releaseLock() {
+ writeLock.lock();
+ Runnable callback = decrementLockCount();
+ try {
+ decrementLockCount();
+ } finally {
+ writeLock.unlock();
+ }
+ callback.run();
+ }
+
+ /**
+ * Acquires lock on the cache within max amount time.
+ * @param timeout Max time to wait to acquire lock.
+ * @return true if lock is acquired otherwise false.
+ * @throws InterruptedException
+ */
+ public boolean tryAcquire(long timeout) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ if (timeout <= 0) {
+ endTime = Long.MAX_VALUE;
+ timeout = Long.MAX_VALUE;
+ }
+ if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ Runnable rollbackCallback = null;
+ try {
+ lockCnt += 1;
+ if (lockCnt == 1) {
+ snapshotRefThreadIds.clear();
+ dbMap.values().stream()
+ .flatMap(referenceCounted ->
+
referenceCounted.getThreadCntMap().entrySet().stream().map(entry ->
Pair.of(entry, referenceCounted)))
+ .forEach(entry -> updateThreadCnt(entry.getKey().getKey(),
entry.getValue().get().getSnapshotID(),
+ entry.getKey().getValue()));
+ }
+ while (!snapshotRefThreadIds.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= endTime) {
+ // If and release acquired lock
+ rollbackCallback = decrementLockCount();
+ break;
+ }
+ dbClosed.await(Math.min(endTime - currentTime, lockTimeout),
TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (rollbackCallback != null) {
+ rollbackCallback.run();
+ return false;
+ }
+ invalidateAll();
+ return true;
+ }
+ return false;
+ }
+
+ private void updateThreadCnt(long threadId, UUID key, long cnt) {
+ snapshotRefThreadIds.compute(threadId, (tid, countMap) -> {
+ if (countMap == null) {
+ if (cnt <= 0) {
+ return null;
+ }
+ countMap = new ConcurrentHashMap<>();
+ }
+ countMap.compute(key, (snapId, count) -> {
+ if (count == null) {
+ count = 0L;
+ }
+ count += cnt;
+ return count > 0 ? count : null;
+ });
+ return countMap.isEmpty() ? null : countMap;
+ });
+ }
+
+ /**
+ * Waits for lock to be released. This function doesn't wait for the lock if
the thread already has a few snapshots
+ * open. It only waits if the thread is reading it's first snapshot.
+ * @param threadId
+ * @throws InterruptedException
+ */
+ private void waitForLock(long threadId) throws IOException {
+ if (snapshotRefThreadIds.computeIfPresent(threadId, (k, v) -> v) != null) {
Review Comment:
if computeIfPresent() returns null, that implies the thread is reading its
first snapshot, and it would skip waiting?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -96,10 +119,21 @@ public void invalidate(UUID key) {
if (v == null) {
LOG.warn("SnapshotId: '{}' does not exist in snapshot cache.", k);
} else {
+ readLock.lock();
try {
+ if (lockCnt > 0) {
+ for (Long tid : snapshotRefThreadIds.keySet()) {
Review Comment:
clean up references to snapshots when the lockCnt is greater than 0.
To ensure that threads no longer hold references to specific snapshots (key)
when their associated locks are being released.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads
would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
Review Comment:
please assert that write lock is held to avoid race condition.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -57,14 +62,32 @@ public class SnapshotCache implements
ReferenceCountedCallback, AutoCloseable {
"SnapshotCacheCleanupService";
private final OMMetrics omMetrics;
+ private final ReadWriteLock lock;
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int lockCnt;
+ private final Condition lockReleased;
+ private final Condition dbClosed;
Review Comment:
why is it named dbClosed?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads
would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
+ lockCnt -= 1;
+ if (lockCnt <= 0) {
+ LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+ lockCnt = 0;
+ }
+
+ if (lockCnt == 0) {
+ snapshotRefThreadIds.clear();
+ }
+ return () -> {
+ readLock.lock();
+ try {
+ if (lockCnt == 0) {
+ lockReleased.signalAll();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ };
+ }
+
+ /**
+ * Releases a lock on the cache.
+ */
+ public void releaseLock() {
+ writeLock.lock();
+ Runnable callback = decrementLockCount();
+ try {
+ decrementLockCount();
+ } finally {
+ writeLock.unlock();
+ }
+ callback.run();
+ }
+
+ /**
+ * Acquires lock on the cache within max amount time.
+ * @param timeout Max time to wait to acquire lock.
+ * @return true if lock is acquired otherwise false.
+ * @throws InterruptedException
+ */
+ public boolean tryAcquire(long timeout) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ if (timeout <= 0) {
+ endTime = Long.MAX_VALUE;
+ timeout = Long.MAX_VALUE;
+ }
+ if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ Runnable rollbackCallback = null;
+ try {
+ lockCnt += 1;
+ if (lockCnt == 1) {
+ snapshotRefThreadIds.clear();
+ dbMap.values().stream()
+ .flatMap(referenceCounted ->
+
referenceCounted.getThreadCntMap().entrySet().stream().map(entry ->
Pair.of(entry, referenceCounted)))
+ .forEach(entry -> updateThreadCnt(entry.getKey().getKey(),
entry.getValue().get().getSnapshotID(),
+ entry.getKey().getValue()));
+ }
+ while (!snapshotRefThreadIds.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= endTime) {
+ // If and release acquired lock
+ rollbackCallback = decrementLockCount();
+ break;
+ }
+ dbClosed.await(Math.min(endTime - currentTime, lockTimeout),
TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (rollbackCallback != null) {
+ rollbackCallback.run();
+ return false;
+ }
+ invalidateAll();
+ return true;
+ }
+ return false;
+ }
+
+ private void updateThreadCnt(long threadId, UUID key, long cnt) {
Review Comment:
for thread (threadId), increment the reference count of snapshot (key) by
(cnt)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]