[
https://issues.apache.org/jira/browse/HADOOP-18291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736479#comment-17736479
]
ASF GitHub Bot commented on HADOOP-18291:
-----------------------------------------
mehakmeet commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1239686973
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -166,16 +201,39 @@ private boolean takeLock(LockType lockType, long timeout,
TimeUnit unit) {
}
return false;
}
+
+ private Entry getPrevious() {
+ return previous;
+ }
+
+ private void setPrevious(Entry previous) {
+ this.previous = previous;
+ }
+
+ private Entry getNext() {
+ return next;
+ }
+
+ private void setNext(Entry next) {
+ this.next = next;
+ }
}
/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
+ * @param conf the configuration object.
*/
- public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
Configuration conf) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
+ this.maxBlocksCount =
+ conf.getInt(FS_PREFETCH_MAX_BLOCKS_COUNT,
DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT);
+ Preconditions.checkArgument(this.maxBlocksCount > 0,
+ "prefetch blocks total capacity should be more than 0");
Review Comment:
Include the property name in the error message by which we can set this to a
valid value
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
+ if (head == null) {
+ head = entry;
+ tail = entry;
+ }
+ if (entry != head) {
+ Entry prev = entry.getPrevious();
+ Entry nxt = entry.getNext();
+ if (prev != null) {
+ prev.setNext(nxt);
+ }
+ if (nxt != null) {
+ nxt.setPrevious(prev);
+ }
+ entry.setPrevious(null);
+ entry.setNext(head);
+ head.setPrevious(entry);
+ head = entry;
+ }
+ if (tail != null) {
+ while (tail.getNext() != null) {
+ tail = tail.getNext();
+ }
+ }
Review Comment:
Can you explain a bit about this part, not able to get why this is needed?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -89,6 +110,16 @@ public class SingleFilePerBlockCache implements BlockCache {
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
ImmutableSet.of(PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE);
+ /**
+ * Prefetch max blocks count config.
+ */
+ public static final String FS_PREFETCH_MAX_BLOCKS_COUNT =
"fs.prefetch.max.blocks.count";
Review Comment:
Is there a Constants class where we can move this to?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
+ try {
+ if (blocks.size() > maxBlocksCount && !closed.get()) {
+ Entry elementToPurge = tail;
+ tail = tail.getPrevious();
+ if (tail == null) {
+ tail = head;
+ }
+ tail.setNext(null);
+ elementToPurge.setPrevious(null);
+ deleteBlockFileAndEvictCache(elementToPurge);
+ }
+ } finally {
+ blocksLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Delete cache file as part of the block cache LRU eviction.
+ *
+ * @param elementToPurge Block entry to evict.
+ */
+ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+ boolean lockAcquired =
+ elementToPurge.takeLock(Entry.LockType.WRITE,
PREFETCH_WRITE_LOCK_TIMEOUT,
+ PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ if (!lockAcquired) {
+ LOG.error("Cache file {} deletion would not be attempted as write lock
could not"
Review Comment:
So, there can be a scenario where the current cache exceeds its normal
capacity? Is 5 seconds enough time? or are we okay with this?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -61,7 +62,27 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* Blocks stored in this cache.
*/
- private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+ private final Map<Integer, Entry> blocks;
+
+ /**
+ * Total max blocks count, to be considered as baseline for LRU cache.
+ */
+ private final int maxBlocksCount;
+
+ /**
+ * The lock to be shared by LRU based linked list updates.
+ */
+ private final ReentrantReadWriteLock blocksLock;
+
+ /**
+ * Head of the linked list.
+ */
+ private Entry head;
+
+ /**
+ * Tail of the lined list.
Review Comment:
typo: "linked"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
Review Comment:
Can we add bit of logging here for debug purposes? Like entry's block
number, maybe head an tail at this point would be good too see as well.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
Review Comment:
There can be a scenario when we just added the entry to the head and before
we try to evict the tail, another entry acquires the write lock inside
`addToHeadOfLinkedList(entry)` and adds the entry at the head. Not sure if that
would cause any issues tho...
> S3A prefetch - Implement LRU cache for SingleFilePerBlockCache
> --------------------------------------------------------------
>
> Key: HADOOP-18291
> URL: https://issues.apache.org/jira/browse/HADOOP-18291
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.4.0
> Reporter: Ahmar Suhail
> Assignee: Viraj Jasani
> Priority: Major
> Labels: pull-request-available
>
> Currently there is no limit on the size of disk cache. This means we could
> have a large number of files on files, especially for access patterns that
> are very random and do not always read the block fully.
>
> eg:
> in.seek(5);
> in.read();
> in.seek(blockSize + 10) // block 0 gets saved to disk as it's not fully read
> in.read();
> in.seek(2 * blockSize + 10) // block 1 gets saved to disk
> .. and so on
>
> The in memory cache is bounded, and by default has a limit of 72MB (9
> blocks). When a block is fully read, and a seek is issued it's released
> [here|https://github.com/apache/hadoop/blob/feature-HADOOP-18028-s3a-prefetch/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java#L109].
> We can also delete the on disk file for the block here if it exists.
>
> Also maybe add an upper limit on disk space, and delete the file which stores
> data of the block furthest from the current block (similar to the in memory
> cache) when this limit is reached.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]