mehakmeet commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1239686973
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -166,16 +201,39 @@ private boolean takeLock(LockType lockType, long timeout,
TimeUnit unit) {
}
return false;
}
+
+ private Entry getPrevious() {
+ return previous;
+ }
+
+ private void setPrevious(Entry previous) {
+ this.previous = previous;
+ }
+
+ private Entry getNext() {
+ return next;
+ }
+
+ private void setNext(Entry next) {
+ this.next = next;
+ }
}
/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
+ * @param conf the configuration object.
*/
- public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
Configuration conf) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
+ this.maxBlocksCount =
+ conf.getInt(FS_PREFETCH_MAX_BLOCKS_COUNT,
DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT);
+ Preconditions.checkArgument(this.maxBlocksCount > 0,
+ "prefetch blocks total capacity should be more than 0");
Review Comment:
Include the property name in the error message by which we can set this to a
valid value
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
+ if (head == null) {
+ head = entry;
+ tail = entry;
+ }
+ if (entry != head) {
+ Entry prev = entry.getPrevious();
+ Entry nxt = entry.getNext();
+ if (prev != null) {
+ prev.setNext(nxt);
+ }
+ if (nxt != null) {
+ nxt.setPrevious(prev);
+ }
+ entry.setPrevious(null);
+ entry.setNext(head);
+ head.setPrevious(entry);
+ head = entry;
+ }
+ if (tail != null) {
+ while (tail.getNext() != null) {
+ tail = tail.getNext();
+ }
+ }
Review Comment:
Can you explain a bit about this part, not able to get why this is needed?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -89,6 +110,16 @@ public class SingleFilePerBlockCache implements BlockCache {
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
ImmutableSet.of(PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE);
+ /**
+ * Prefetch max blocks count config.
+ */
+ public static final String FS_PREFETCH_MAX_BLOCKS_COUNT =
"fs.prefetch.max.blocks.count";
Review Comment:
Is there a Constants class where we can move this to?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
+ try {
+ if (blocks.size() > maxBlocksCount && !closed.get()) {
+ Entry elementToPurge = tail;
+ tail = tail.getPrevious();
+ if (tail == null) {
+ tail = head;
+ }
+ tail.setNext(null);
+ elementToPurge.setPrevious(null);
+ deleteBlockFileAndEvictCache(elementToPurge);
+ }
+ } finally {
+ blocksLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Delete cache file as part of the block cache LRU eviction.
+ *
+ * @param elementToPurge Block entry to evict.
+ */
+ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+ boolean lockAcquired =
+ elementToPurge.takeLock(Entry.LockType.WRITE,
PREFETCH_WRITE_LOCK_TIMEOUT,
+ PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ if (!lockAcquired) {
+ LOG.error("Cache file {} deletion would not be attempted as write lock
could not"
Review Comment:
So, there can be a scenario where the current cache exceeds its normal
capacity? Is 5 seconds enough time? or are we okay with this?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -61,7 +62,27 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* Blocks stored in this cache.
*/
- private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+ private final Map<Integer, Entry> blocks;
+
+ /**
+ * Total max blocks count, to be considered as baseline for LRU cache.
+ */
+ private final int maxBlocksCount;
+
+ /**
+ * The lock to be shared by LRU based linked list updates.
+ */
+ private final ReentrantReadWriteLock blocksLock;
+
+ /**
+ * Head of the linked list.
+ */
+ private Entry head;
+
+ /**
+ * Tail of the lined list.
Review Comment:
typo: "linked"
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in
cache", blockNumber));
}
numGets++;
+ addToHeadOfLinkedList(entry);
return entry;
}
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
Review Comment:
Can we add bit of logging here for debug purposes? Like entry's block
number, maybe head an tail at this point would be good too see as well.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer,
Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is
updated with new file
// entry to avoid any discrepancy related to the value of
stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks
map here, closing of
- // the input stream can lead to the removal of the cache file even before
blocks is added with
- // the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before
blocks is added
+ // with the new cache file, leading to incorrect value of
stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache
size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ addToHeadOfLinkedList(entry);
+ blocksLock.writeLock().lock();
Review Comment:
There can be a scenario when we just added the entry to the head and before
we try to evict the tail, another entry acquires the write lock inside
`addToHeadOfLinkedList(entry)` and adds the entry at the head. Not sure if that
would cause any issues tho...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]