[ 
https://issues.apache.org/jira/browse/HADOOP-18291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736479#comment-17736479
 ] 

ASF GitHub Bot commented on HADOOP-18291:
-----------------------------------------

mehakmeet commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1239686973


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -166,16 +201,39 @@ private boolean takeLock(LockType lockType, long timeout, 
TimeUnit unit) {
       }
       return false;
     }
+
+    private Entry getPrevious() {
+      return previous;
+    }
+
+    private void setPrevious(Entry previous) {
+      this.previous = previous;
+    }
+
+    private Entry getNext() {
+      return next;
+    }
+
+    private void setNext(Entry next) {
+      this.next = next;
+    }
   }
 
   /**
    * Constructs an instance of a {@code SingleFilePerBlockCache}.
    *
    * @param prefetchingStatistics statistics for this stream.
+   * @param conf the configuration object.
    */
-  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, 
Configuration conf) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
     this.closed = new AtomicBoolean(false);
+    this.maxBlocksCount =
+        conf.getInt(FS_PREFETCH_MAX_BLOCKS_COUNT, 
DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT);
+    Preconditions.checkArgument(this.maxBlocksCount > 0,
+        "prefetch blocks total capacity should be more than 0");

Review Comment:
   Include the property name in the error message by which we can set this to a 
valid value 



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in 
cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {
+      if (head == null) {
+        head = entry;
+        tail = entry;
+      }
+      if (entry != head) {
+        Entry prev = entry.getPrevious();
+        Entry nxt = entry.getNext();
+        if (prev != null) {
+          prev.setNext(nxt);
+        }
+        if (nxt != null) {
+          nxt.setPrevious(prev);
+        }
+        entry.setPrevious(null);
+        entry.setNext(head);
+        head.setPrevious(entry);
+        head = entry;
+      }
+      if (tail != null) {
+        while (tail.getNext() != null) {
+          tail = tail.getNext();
+        }
+      }

Review Comment:
   Can you explain a bit about this part, not able to get why this is needed?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -89,6 +110,16 @@ public class SingleFilePerBlockCache implements BlockCache {
   private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
       ImmutableSet.of(PosixFilePermission.OWNER_READ, 
PosixFilePermission.OWNER_WRITE);
 
+  /**
+   * Prefetch max blocks count config.
+   */
+  public static final String FS_PREFETCH_MAX_BLOCKS_COUNT = 
"fs.prefetch.max.blocks.count";

Review Comment:
   Is there a Constants class where we can move this to?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, 
Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is 
updated with new file
     // entry to avoid any discrepancy related to the value of 
stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks 
map here, closing of
-    // the input stream can lead to the removal of the cache file even before 
blocks is added with
-    // the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before 
blocks is added
+    // with the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache 
size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();
+    try {
+      if (blocks.size() > maxBlocksCount && !closed.get()) {
+        Entry elementToPurge = tail;
+        tail = tail.getPrevious();
+        if (tail == null) {
+          tail = head;
+        }
+        tail.setNext(null);
+        elementToPurge.setPrevious(null);
+        deleteBlockFileAndEvictCache(elementToPurge);
+      }
+    } finally {
+      blocksLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Delete cache file as part of the block cache LRU eviction.
+   *
+   * @param elementToPurge Block entry to evict.
+   */
+  private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+    boolean lockAcquired =
+        elementToPurge.takeLock(Entry.LockType.WRITE, 
PREFETCH_WRITE_LOCK_TIMEOUT,
+            PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+    if (!lockAcquired) {
+      LOG.error("Cache file {} deletion would not be attempted as write lock 
could not"

Review Comment:
   So, there can be a scenario where the current cache exceeds its normal 
capacity? Is 5 seconds enough time? or are we okay with this?



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -61,7 +62,27 @@ public class SingleFilePerBlockCache implements BlockCache {
   /**
    * Blocks stored in this cache.
    */
-  private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+  private final Map<Integer, Entry> blocks;
+
+  /**
+   * Total max blocks count, to be considered as baseline for LRU cache.
+   */
+  private final int maxBlocksCount;
+
+  /**
+   * The lock to be shared by LRU based linked list updates.
+   */
+  private final ReentrantReadWriteLock blocksLock;
+
+  /**
+   * Head of the linked list.
+   */
+  private Entry head;
+
+  /**
+   * Tail of the lined list.

Review Comment:
   typo: "linked"



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in 
cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {

Review Comment:
   Can we add bit of logging here for debug purposes? Like entry's block 
number, maybe head an tail at this point would be good too see as well.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, 
Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is 
updated with new file
     // entry to avoid any discrepancy related to the value of 
stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks 
map here, closing of
-    // the input stream can lead to the removal of the cache file even before 
blocks is added with
-    // the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before 
blocks is added
+    // with the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache 
size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();

Review Comment:
   There can be a scenario when we just added the entry to the head and before 
we try to evict the tail, another entry acquires the write lock inside 
`addToHeadOfLinkedList(entry)` and adds the entry at the head. Not sure if that 
would cause any issues tho...





> S3A prefetch - Implement LRU cache for SingleFilePerBlockCache
> --------------------------------------------------------------
>
>                 Key: HADOOP-18291
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18291
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.4.0
>            Reporter: Ahmar Suhail
>            Assignee: Viraj Jasani
>            Priority: Major
>              Labels: pull-request-available
>
> Currently there is no limit on the size of disk cache. This means we could 
> have a large number of files on files, especially for access patterns that 
> are very random and do not always read the block fully. 
>  
> eg:
> in.seek(5);
> in.read(); 
> in.seek(blockSize + 10) // block 0 gets saved to disk as it's not fully read
> in.read();
> in.seek(2 * blockSize + 10) // block 1 gets saved to disk
> .. and so on
>  
> The in memory cache is bounded, and by default has a limit of 72MB (9 
> blocks). When a block is fully read, and a seek is issued it's released 
> [here|https://github.com/apache/hadoop/blob/feature-HADOOP-18028-s3a-prefetch/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java#L109].
>  We can also delete the on disk file for the block here if it exists. 
>  
> Also maybe add an upper limit on disk space, and delete the file which stores 
> data of the block furthest from the current block (similar to the in memory 
> cache) when this limit is reached. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to