[
https://issues.apache.org/jira/browse/HADOOP-18291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736674#comment-17736674
]
ASF GitHub Bot commented on HADOOP-18291:
-----------------------------------------
virajjasani commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1240621453
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
Review Comment:
done, added on L305, and yes i agree that it can be helpful, in fact i added
it for testing purpose anyways
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
+ if (head == null) {
+ head = entry;
+ tail = entry;
+ }
+ if (entry != head) {
+ Entry prev = entry.getPrevious();
+ Entry nxt = entry.getNext();
+ if (prev != null) {
+ prev.setNext(nxt);
+ }
+ if (nxt != null) {
+ nxt.setPrevious(prev);
+ }
+ entry.setPrevious(null);
+ entry.setNext(head);
+ head.setPrevious(entry);
+ head = entry;
+ }
+ if (tail != null) {
+ while (tail.getNext() != null) {
+ tail = tail.getNext();
+ }
+ }
Review Comment:
sure, let's say:
head -> 1, tail -> 2
new block: 3
so, we need to make: 3 -> 1 -> 2
i.e. new head -> 3, tail -> 2
new block: 4
updated list: 4 -> 3 -> 1 -> 2
now let's say input stream accesses block 2, hence block 2 needs to be put
at the head.
new list should be: 2 -> 4 -> 3 -> 1
we change head to 2 and we also update next pointer of block 1
however, if we don't update tail (L322-L326), we will not be able to move
tail to block 1.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
Review Comment:
yeah that would still be okay given that eviction would take place anyways
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
+ try {
+ if (blocks.size() > maxBlocksCount && !closed.get()) {
+ Entry elementToPurge = tail;
+ tail = tail.getPrevious();
+ if (tail == null) {
+ tail = head;
+ }
+ tail.setNext(null);
+ elementToPurge.setPrevious(null);
+ deleteBlockFileAndEvictCache(elementToPurge);
+ }
+ } finally {
+ blocksLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Delete cache file as part of the block cache LRU eviction.
+ *
+ * @param elementToPurge Block entry to evict.
+ */
+ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+ boolean lockAcquired =
+ elementToPurge.takeLock(Entry.LockType.WRITE,
PREFETCH_WRITE_LOCK_TIMEOUT,
+ PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ if (!lockAcquired) {
+ LOG.error("Cache file {} deletion would not be attempted as write lock
could not"
Review Comment:
since we are already using 5s at other place also
(PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in
future as/if we encounter some problem with this, does that sound good?
> S3A prefetch - Implement LRU cache for SingleFilePerBlockCache
> --------------------------------------------------------------
>
> Key: HADOOP-18291
> URL: https://issues.apache.org/jira/browse/HADOOP-18291
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.4.0
> Reporter: Ahmar Suhail
> Assignee: Viraj Jasani
> Priority: Major
> Labels: pull-request-available
>
> Currently there is no limit on the size of disk cache. This means we could
> have a large number of files on files, especially for access patterns that
> are very random and do not always read the block fully.
>
> eg:
> in.seek(5);
> in.read();
> in.seek(blockSize + 10) // block 0 gets saved to disk as it's not fully read
> in.read();
> in.seek(2 * blockSize + 10) // block 1 gets saved to disk
> .. and so on
>
> The in memory cache is bounded, and by default has a limit of 72MB (9
> blocks). When a block is fully read, and a seek is issued it's released
> [here|https://github.com/apache/hadoop/blob/feature-HADOOP-18028-s3a-prefetch/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java#L109].
> We can also delete the on disk file for the block here if it exists.
>
> Also maybe add an upper limit on disk space, and delete the file which stores
> data of the block furthest from the current block (similar to the in memory
> cache) when this limit is reached.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]