This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 2805189237f HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache (#5754) 2805189237f is described below commit 2805189237f13005053e616533b439514d61e4ca Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Fri Jul 14 03:21:01 2023 -0600 HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache (#5754) Contributed by Viraj Jasani --- .../fs/impl/prefetch/CachingBlockManager.java | 12 +- .../hadoop/fs/impl/prefetch/PrefetchConstants.java | 44 ++++ .../fs/impl/prefetch/SingleFilePerBlockCache.java | 221 ++++++++++++++++--- .../hadoop/fs/impl/prefetch/TestBlockCache.java | 6 +- .../java/org/apache/hadoop/fs/s3a/Constants.java | 13 ++ .../fs/s3a/prefetch/S3ACachingBlockManager.java | 11 +- .../fs/s3a/ITestS3APrefetchingLruEviction.java | 243 +++++++++++++++++++++ .../hadoop/fs/s3a/prefetch/S3APrefetchFakes.java | 6 +- .../s3a/prefetch/TestS3ACachingBlockManager.java | 20 +- 9 files changed, 529 insertions(+), 47 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index e43b176d0bf..4461c118625 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager { * @param prefetchingStatistics statistics for this stream. * @param conf the configuration. * @param localDirAllocator the local dir allocator instance. + * @param maxBlocksCount max blocks count to be kept in cache at any time. * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( @@ -118,7 +119,8 @@ public abstract class CachingBlockManager extends BlockManager { int bufferPoolSize, PrefetchingStatistics prefetchingStatistics, Configuration conf, - LocalDirAllocator localDirAllocator) { + LocalDirAllocator localDirAllocator, + int maxBlocksCount) { super(blockData); Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); @@ -129,16 +131,16 @@ public abstract class CachingBlockManager extends BlockManager { this.numReadErrors = new AtomicInteger(); this.cachingDisabled = new AtomicBoolean(); this.prefetchingStatistics = requireNonNull(prefetchingStatistics); + this.conf = requireNonNull(conf); if (this.getBlockData().getFileSize() > 0) { this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), this.prefetchingStatistics); - this.cache = this.createCache(); + this.cache = this.createCache(maxBlocksCount); } this.ops = new BlockOperations(); this.ops.setDebug(false); - this.conf = requireNonNull(conf); this.localDirAllocator = localDirAllocator; } @@ -557,8 +559,8 @@ public abstract class CachingBlockManager extends BlockManager { } } - protected BlockCache createCache() { - return new SingleFilePerBlockCache(prefetchingStatistics); + protected BlockCache createCache(int maxBlocksCount) { + return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount); } protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java new file mode 100644 index 00000000000..785023f523c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.impl.prefetch; + +import java.util.concurrent.TimeUnit; + +/** + * Constants used by prefetch implementations. + */ +public final class PrefetchConstants { + + private PrefetchConstants() { + } + + /** + * Timeout to be used by close, while acquiring prefetch block write lock. + * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT} + */ + static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5; + + /** + * Lock timeout unit to be used by the thread while acquiring prefetch block write lock. + * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT} + */ + static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS; + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index e043fbd904b..a84a79eb778 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull; @@ -61,27 +62,42 @@ public class SingleFilePerBlockCache implements BlockCache { /** * Blocks stored in this cache. */ - private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>(); + private final Map<Integer, Entry> blocks; /** - * Number of times a block was read from this cache. - * Used for determining cache utilization factor. + * Total max blocks count, to be considered as baseline for LRU cache eviction. */ - private int numGets = 0; + private final int maxBlocksCount; - private final AtomicBoolean closed; + /** + * The lock to be shared by LRU based linked list updates. + */ + private final ReentrantReadWriteLock blocksLock; - private final PrefetchingStatistics prefetchingStatistics; + /** + * Head of the linked list. + */ + private Entry head; + + /** + * Tail of the linked list. + */ + private Entry tail; /** - * Timeout to be used by close, while acquiring prefetch block write lock. + * Total size of the linked list. */ - private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5; + private int entryListSize; /** - * Lock timeout unit to be used by the thread while acquiring prefetch block write lock. + * Number of times a block was read from this cache. + * Used for determining cache utilization factor. */ - private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS; + private int numGets = 0; + + private final AtomicBoolean closed; + + private final PrefetchingStatistics prefetchingStatistics; /** * File attributes attached to any intermediate temporary file created during index creation. @@ -103,6 +119,8 @@ public class SingleFilePerBlockCache implements BlockCache { READ, WRITE } + private Entry previous; + private Entry next; Entry(int blockNumber, Path path, int size, long checksum) { this.blockNumber = blockNumber; @@ -110,6 +128,8 @@ public class SingleFilePerBlockCache implements BlockCache { this.size = size; this.checksum = checksum; this.lock = new ReentrantReadWriteLock(); + this.previous = null; + this.next = null; } @Override @@ -166,16 +186,37 @@ public class SingleFilePerBlockCache implements BlockCache { } return false; } + + private Entry getPrevious() { + return previous; + } + + private void setPrevious(Entry previous) { + this.previous = previous; + } + + private Entry getNext() { + return next; + } + + private void setNext(Entry next) { + this.next = next; + } } /** * Constructs an instance of a {@code SingleFilePerBlockCache}. * * @param prefetchingStatistics statistics for this stream. + * @param maxBlocksCount max blocks count to be kept in cache at any time. */ - public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) { + public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) { this.prefetchingStatistics = requireNonNull(prefetchingStatistics); this.closed = new AtomicBoolean(false); + this.maxBlocksCount = maxBlocksCount; + Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0"); + blocks = new ConcurrentHashMap<>(); + blocksLock = new ReentrantReadWriteLock(); } /** @@ -247,9 +288,60 @@ public class SingleFilePerBlockCache implements BlockCache { throw new IllegalStateException(String.format("block %d not found in cache", blockNumber)); } numGets++; + addToLinkedListHead(entry); return entry; } + /** + * Helper method to add the given entry to the head of the linked list. + * + * @param entry Block entry to add. + */ + private void addToLinkedListHead(Entry entry) { + blocksLock.writeLock().lock(); + try { + addToHeadOfLinkedList(entry); + } finally { + blocksLock.writeLock().unlock(); + } + } + + /** + * Add the given entry to the head of the linked list. + * + * @param entry Block entry to add. + */ + private void addToHeadOfLinkedList(Entry entry) { + if (head == null) { + head = entry; + tail = entry; + } + LOG.debug( + "Block num {} to be added to the head. Current head block num: {} and tail block num: {}", + entry.blockNumber, head.blockNumber, tail.blockNumber); + if (entry != head) { + Entry prev = entry.getPrevious(); + Entry nxt = entry.getNext(); + // no-op if the block is already evicted + if (!blocks.containsKey(entry.blockNumber)) { + return; + } + if (prev != null) { + prev.setNext(nxt); + } + if (nxt != null) { + nxt.setPrevious(prev); + } + entry.setPrevious(null); + entry.setNext(head); + head.setPrevious(entry); + head = entry; + if (prev != null && prev.getNext() == null) { + tail = prev; + } + } + } + /** * Puts the given block in this cache. * @@ -278,6 +370,7 @@ public class SingleFilePerBlockCache implements BlockCache { } finally { entry.releaseLock(Entry.LockType.READ); } + addToLinkedListHead(entry); return; } @@ -299,9 +392,65 @@ public class SingleFilePerBlockCache implements BlockCache { // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache. // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of - // the input stream can lead to the removal of the cache file even before blocks is added with - // the new cache file, leading to incorrect value of stream_read_blocks_in_cache. + // the input stream can lead to the removal of the cache file even before blocks is added + // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache. prefetchingStatistics.blockAddedToFileCache(); + addToLinkedListAndEvictIfRequired(entry); + } + + /** + * Add the given entry to the head of the linked list and if the LRU cache size + * exceeds the max limit, evict tail of the LRU linked list. + * + * @param entry Block entry to add. + */ + private void addToLinkedListAndEvictIfRequired(Entry entry) { + blocksLock.writeLock().lock(); + try { + addToHeadOfLinkedList(entry); + entryListSize++; + if (entryListSize > maxBlocksCount && !closed.get()) { + Entry elementToPurge = tail; + tail = tail.getPrevious(); + if (tail == null) { + tail = head; + } + tail.setNext(null); + elementToPurge.setPrevious(null); + deleteBlockFileAndEvictCache(elementToPurge); + } + } finally { + blocksLock.writeLock().unlock(); + } + } + + /** + * Delete cache file as part of the block cache LRU eviction. + * + * @param elementToPurge Block entry to evict. + */ + private void deleteBlockFileAndEvictCache(Entry elementToPurge) { + boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", elementToPurge.path, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + } else { + try { + if (Files.deleteIfExists(elementToPurge.path)) { + entryListSize--; + prefetchingStatistics.blockRemovedFromFileCache(); + blocks.remove(elementToPurge.blockNumber); + } + } catch (IOException e) { + LOG.warn("Failed to delete cache file {}", elementToPurge.path, e); + } finally { + elementToPurge.releaseLock(Entry.LockType.WRITE); + } + } } private static final Set<? extends OpenOption> CREATE_OPTIONS = @@ -337,30 +486,38 @@ public class SingleFilePerBlockCache implements BlockCache { public void close() throws IOException { if (closed.compareAndSet(false, true)) { LOG.debug(getStats()); - int numFilesDeleted = 0; - - for (Entry entry : blocks.values()) { - boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, - PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - if (!lockAcquired) { - LOG.error("Cache file {} deletion would not be attempted as write lock could not" - + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT, - PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); - continue; - } - try { - Files.deleteIfExists(entry.path); + deleteCacheFiles(); + } + } + + /** + * Delete cache files as part of the close call. + */ + private void deleteCacheFiles() { + int numFilesDeleted = 0; + for (Entry entry : blocks.values()) { + boolean lockAcquired = + entry.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + if (!lockAcquired) { + LOG.error("Cache file {} deletion would not be attempted as write lock could not" + + " be acquired within {} {}", entry.path, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, + PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); + continue; + } + try { + if (Files.deleteIfExists(entry.path)) { prefetchingStatistics.blockRemovedFromFileCache(); numFilesDeleted++; - } catch (IOException e) { - LOG.warn("Failed to delete cache file {}", entry.path, e); - } finally { - entry.releaseLock(Entry.LockType.WRITE); } + } catch (IOException e) { + LOG.warn("Failed to delete cache file {}", entry.path, e); + } finally { + entry.releaseLock(Entry.LockType.WRITE); } - - LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted); } + LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted); } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java index 3b60c1c7953..b32ce20a373 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java @@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase { public void testArgChecks() throws Exception { // Should not throw. BlockCache cache = - new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance()); + new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2); ByteBuffer buffer = ByteBuffer.allocate(16); @@ -55,7 +55,7 @@ public class TestBlockCache extends AbstractHadoopTestBase { intercept(NullPointerException.class, null, - () -> new SingleFilePerBlockCache(null)); + () -> new SingleFilePerBlockCache(null, 2)); } @@ -63,7 +63,7 @@ public class TestBlockCache extends AbstractHadoopTestBase { @Test public void testPutAndGet() throws Exception { BlockCache cache = - new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance()); + new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2); ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE); for (byte i = 0; i < BUFFER_SIZE; i++) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 451b9b0ee24..4937e5d68d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1275,4 +1275,17 @@ public final class Constants { */ public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = "fs.s3a.capability.multipart.uploads.enabled"; + + /** + * Prefetch max blocks count config. + * Value = {@value} + */ + public static final String PREFETCH_MAX_BLOCKS_COUNT = "fs.s3a.prefetch.max.blocks.count"; + + /** + * Default value for max blocks count config. + * Value = {@value} + */ + public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java index c166943c00e..a02922053aa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java @@ -33,6 +33,9 @@ import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; + /** * Provides access to S3 file one block at a time. */ @@ -67,7 +70,13 @@ public class S3ACachingBlockManager extends CachingBlockManager { Configuration conf, LocalDirAllocator localDirAllocator) { - super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator); + super(futurePool, + blockData, + bufferPoolSize, + streamStatistics, + conf, + localDirAllocator, + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)); Validate.checkNotNull(reader, "reader"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java new file mode 100644 index 00000000000..bbe01887588 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream. + */ +@RunWith(Parameterized.class) +public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest { + + private final String maxBlocks; + + @Parameterized.Parameters(name = "max-blocks-{0}") + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{ + {"1"}, + {"2"}, + {"3"}, + {"4"} + }); + } + + public ITestS3APrefetchingLruEviction(final String maxBlocks) { + super(true); + this.maxBlocks = maxBlocks; + } + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class); + + private static final int S_1K = 1024; + // Path for file which should have length > block size so S3ACachingInputStream is used + private Path largeFile; + private FileSystem largeFileFS; + private int blockSize; + + private static final int TIMEOUT_MILLIS = 5000; + private static final int INTERVAL_MILLIS = 500; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT); + conf.setBoolean(PREFETCH_ENABLED_KEY, true); + conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks)); + return conf; + } + + @Override + public void teardown() throws Exception { + super.teardown(); + cleanupWithLogger(LOG, largeFileFS); + largeFileFS = null; + } + + private void openFS() throws Exception { + Configuration conf = getConfiguration(); + String largeFileUri = S3ATestUtils.getCSVTestFile(conf); + + largeFile = new Path(largeFileUri); + blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + largeFileFS = new S3AFileSystem(); + largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); + } + + @Test + public void testSeeksWithLruEviction() throws Throwable { + IOStatistics ioStats; + openFS(); + + ExecutorService executorService = Executors.newFixedThreadPool(5, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("testSeeksWithLruEviction-%d") + .build()); + CountDownLatch countDownLatch = new CountDownLatch(7); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + ioStats = in.getIOStatistics(); + // tests to add multiple blocks in the prefetch cache + // and let LRU eviction take place as more cache entries + // are added with multiple block reads. + + // Don't read block 0 completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + 0, + blockSize - S_1K * 10)); + + // Seek to block 1 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize, + 2 * S_1K)); + + // Seek to block 2 and don't read completely + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + blockSize * 2L, + 2 * S_1K)); + + // Seek to block 3 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize * 3L, + 2 * S_1K)); + + // Seek to block 4 and don't read completely + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + blockSize * 4L, + 2 * S_1K)); + + // Seek to block 5 and don't read completely + executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, + in, + blockSize * 5L, + 2 * S_1K)); + + // backward seek, can't use block 0 as it is evicted + executorService.submit(() -> readFullyWithSeek(countDownLatch, + in, + S_1K * 5, + 2 * S_1K)); + + countDownLatch.await(); + + // expect 3 blocks as rest are to be evicted by LRU + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, + Integer.parseInt(maxBlocks)); + }); + // let LRU evictions settle down, if any + Thread.sleep(TIMEOUT_MILLIS); + } finally { + executorService.shutdownNow(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> { + LOG.info("IO stats: {}", ioStats); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + }); + } + + /** + * Read the bytes from the given position in the stream to a new buffer using the positioned + * readable. + * + * @param countDownLatch count down latch to mark the operation completed. + * @param in input stream. + * @param position position in the given input stream to seek from. + * @param len the number of bytes to read. + * @return true if the read operation is successful. + */ + private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in, + long position, int len) { + byte[] buffer = new byte[blockSize]; + // Don't read block 0 completely + try { + in.readFully(position, buffer, 0, len); + countDownLatch.countDown(); + return true; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Read the bytes from the given position in the stream to a new buffer using seek followed by + * input stream read. + * + * @param countDownLatch count down latch to mark the operation completed. + * @param in input stream. + * @param position position in the given input stream to seek from. + * @param len the number of bytes to read. + * @return true if the read operation is successful. + */ + private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in, + long position, int len) { + byte[] buffer = new byte[blockSize]; + // Don't read block 0 completely + try { + in.seek(position); + in.readFully(buffer, 0, len); + countDownLatch.countDown(); + return true; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index cf6aa7ba1aa..6cf2ab241e2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.impl.prefetch.BlockData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache; import org.apache.hadoop.fs.impl.prefetch.Validate; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3AFileStatus; @@ -314,7 +315,8 @@ public final class S3APrefetchFakes { private final int writeDelay; public FakeS3FilePerBlockCache(int readDelay, int writeDelay) { - super(new EmptyS3AStatisticsContext().newInputStreamStatistics()); + super(new EmptyS3AStatisticsContext().newInputStreamStatistics(), + Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT); this.files = new ConcurrentHashMap<>(); this.readDelay = readDelay; this.writeDelay = writeDelay; @@ -387,7 +389,7 @@ public final class S3APrefetchFakes { } @Override - protected BlockCache createCache() { + protected BlockCache createCache(int maxBlocksCount) { final int readDelayMs = 50; final int writeDelayMs = 200; return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java index cbfa643ee53..8ec94d469da 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java @@ -37,7 +37,9 @@ import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; @@ -173,6 +175,10 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase { super.cachePut(blockNumber, buffer); } } + + public Configuration getConf() { + return CONF; + } } // @Ignore @@ -285,8 +291,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase { blockManager.requestCaching(data); } - waitForCaching(blockManager, blockData.getNumBlocks()); - assertEquals(blockData.getNumBlocks(), blockManager.numCached()); + waitForCaching(blockManager, Math.min(blockData.getNumBlocks(), + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))); + assertEquals(Math.min(blockData.getNumBlocks(), + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)), + blockManager.numCached()); assertEquals(0, this.totalErrors(blockManager)); } @@ -330,8 +339,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase { } blockManager.requestCaching(data); - waitForCaching(blockManager, expectedNumSuccesses); - assertEquals(expectedNumSuccesses, blockManager.numCached()); + waitForCaching(blockManager, Math.min(expectedNumSuccesses, blockManager.getConf() + .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))); + assertEquals(Math.min(expectedNumSuccesses, blockManager.getConf() + .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)), + blockManager.numCached()); if (forceCachingFailure) { assertEquals(expectedNumErrors, this.totalErrors(blockManager)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org