This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 51a7f7b024e HADOOP-18756. S3A prefetch - CachingBlockManager to use 
AtomicBoolean for closed flag (#5718)
51a7f7b024e is described below

commit 51a7f7b024e563a973f2cdc3ddd11dc9eb957f02
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed Jun 14 04:51:54 2023 -0700

    HADOOP-18756. S3A prefetch - CachingBlockManager to use AtomicBoolean for 
closed flag (#5718)
    
    Contributed by Viraj Jasani
---
 .../fs/impl/prefetch/SingleFilePerBlockCache.java  | 56 ++++++++++------------
 1 file changed, 26 insertions(+), 30 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
index 7a817955452..e043fbd904b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
@@ -68,7 +69,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   private int numGets = 0;
 
-  private boolean closed;
+  private final AtomicBoolean closed;
 
   private final PrefetchingStatistics prefetchingStatistics;
 
@@ -174,6 +175,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
+    this.closed = new AtomicBoolean(false);
   }
 
   /**
@@ -207,7 +209,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   @Override
   public void get(int blockNumber, ByteBuffer buffer) throws IOException {
-    if (closed) {
+    if (closed.get()) {
       return;
     }
 
@@ -262,7 +264,7 @@ public class SingleFilePerBlockCache implements BlockCache {
   @Override
   public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
       LocalDirAllocator localDirAllocator) throws IOException {
-    if (closed) {
+    if (closed.get()) {
       return;
     }
 
@@ -333,37 +335,31 @@ public class SingleFilePerBlockCache implements 
BlockCache {
 
   @Override
   public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-
-    closed = true;
+    if (closed.compareAndSet(false, true)) {
+      LOG.debug(getStats());
+      int numFilesDeleted = 0;
 
-    LOG.info(getStats());
-    int numFilesDeleted = 0;
-
-    for (Entry entry : blocks.values()) {
-      boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, 
PREFETCH_WRITE_LOCK_TIMEOUT,
-          PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-      if (!lockAcquired) {
-        LOG.error("Cache file {} deletion would not be attempted as write lock 
could not"
-                + " be acquired within {} {}", entry.path, 
PREFETCH_WRITE_LOCK_TIMEOUT,
+      for (Entry entry : blocks.values()) {
+        boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, 
PREFETCH_WRITE_LOCK_TIMEOUT,
             PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-        continue;
-      }
-      try {
-        Files.deleteIfExists(entry.path);
-        prefetchingStatistics.blockRemovedFromFileCache();
-        numFilesDeleted++;
-      } catch (IOException e) {
-        LOG.debug("Failed to delete cache file {}", entry.path, e);
-      } finally {
-        entry.releaseLock(Entry.LockType.WRITE);
+        if (!lockAcquired) {
+          LOG.error("Cache file {} deletion would not be attempted as write 
lock could not"
+                  + " be acquired within {} {}", entry.path, 
PREFETCH_WRITE_LOCK_TIMEOUT,
+              PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+          continue;
+        }
+        try {
+          Files.deleteIfExists(entry.path);
+          prefetchingStatistics.blockRemovedFromFileCache();
+          numFilesDeleted++;
+        } catch (IOException e) {
+          LOG.warn("Failed to delete cache file {}", entry.path, e);
+        } finally {
+          entry.releaseLock(Entry.LockType.WRITE);
+        }
       }
-    }
 
-    if (numFilesDeleted > 0) {
-      LOG.info("Deleted {} cache files", numFilesDeleted);
+      LOG.debug("Prefetch cache close: Deleted {} cache files", 
numFilesDeleted);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to