[ 
https://issues.apache.org/jira/browse/HADOOP-18291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736674#comment-17736674
 ] 

ASF GitHub Bot commented on HADOOP-18291:
-----------------------------------------

virajjasani commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1240621453


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in 
cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {

Review Comment:
   done, added on L305, and yes i agree that it can be helpful, in fact i added 
it for testing purpose anyways



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in 
cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {
+      if (head == null) {
+        head = entry;
+        tail = entry;
+      }
+      if (entry != head) {
+        Entry prev = entry.getPrevious();
+        Entry nxt = entry.getNext();
+        if (prev != null) {
+          prev.setNext(nxt);
+        }
+        if (nxt != null) {
+          nxt.setPrevious(prev);
+        }
+        entry.setPrevious(null);
+        entry.setNext(head);
+        head.setPrevious(entry);
+        head = entry;
+      }
+      if (tail != null) {
+        while (tail.getNext() != null) {
+          tail = tail.getNext();
+        }
+      }

Review Comment:
   sure, let's say:
   
   head -> 1, tail -> 2
   
   new block: 3
   so, we need to make: 3 -> 1 -> 2
   i.e. new head -> 3, tail -> 2
   
   new block: 4
   updated list: 4 -> 3 -> 1 -> 2
   
   now let's say input stream accesses block 2, hence block 2 needs to be put 
at the head.
   new list should be: 2 -> 4 -> 3 -> 1
   
   we change head to 2 and we also update next pointer of block 1
   however, if we don't update tail (L322-L326), we will not be able to move 
tail to block 1.
   



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, 
Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is 
updated with new file
     // entry to avoid any discrepancy related to the value of 
stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks 
map here, closing of
-    // the input stream can lead to the removal of the cache file even before 
blocks is added with
-    // the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before 
blocks is added
+    // with the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache 
size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();

Review Comment:
   yeah that would still be okay given that eviction would take place anyways



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, 
Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is 
updated with new file
     // entry to avoid any discrepancy related to the value of 
stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks 
map here, closing of
-    // the input stream can lead to the removal of the cache file even before 
blocks is added with
-    // the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before 
blocks is added
+    // with the new cache file, leading to incorrect value of 
stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache 
size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();
+    try {
+      if (blocks.size() > maxBlocksCount && !closed.get()) {
+        Entry elementToPurge = tail;
+        tail = tail.getPrevious();
+        if (tail == null) {
+          tail = head;
+        }
+        tail.setNext(null);
+        elementToPurge.setPrevious(null);
+        deleteBlockFileAndEvictCache(elementToPurge);
+      }
+    } finally {
+      blocksLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Delete cache file as part of the block cache LRU eviction.
+   *
+   * @param elementToPurge Block entry to evict.
+   */
+  private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+    boolean lockAcquired =
+        elementToPurge.takeLock(Entry.LockType.WRITE, 
PREFETCH_WRITE_LOCK_TIMEOUT,
+            PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+    if (!lockAcquired) {
+      LOG.error("Cache file {} deletion would not be attempted as write lock 
could not"

Review Comment:
   since we are already using 5s at other place also 
(PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in 
future as/if we encounter some problem with this, does that sound good?





> S3A prefetch - Implement LRU cache for SingleFilePerBlockCache
> --------------------------------------------------------------
>
>                 Key: HADOOP-18291
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18291
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.4.0
>            Reporter: Ahmar Suhail
>            Assignee: Viraj Jasani
>            Priority: Major
>              Labels: pull-request-available
>
> Currently there is no limit on the size of disk cache. This means we could 
> have a large number of files on files, especially for access patterns that 
> are very random and do not always read the block fully. 
>  
> eg:
> in.seek(5);
> in.read(); 
> in.seek(blockSize + 10) // block 0 gets saved to disk as it's not fully read
> in.read();
> in.seek(2 * blockSize + 10) // block 1 gets saved to disk
> .. and so on
>  
> The in memory cache is bounded, and by default has a limit of 72MB (9 
> blocks). When a block is fully read, and a seek is issued it's released 
> [here|https://github.com/apache/hadoop/blob/feature-HADOOP-18028-s3a-prefetch/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java#L109].
>  We can also delete the on disk file for the block here if it exists. 
>  
> Also maybe add an upper limit on disk space, and delete the file which stores 
> data of the block furthest from the current block (similar to the in memory 
> cache) when this limit is reached. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to