This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch-branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ca52512f957bf7f7cbe31ffd48ee11415fc2a5d9 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Fri Sep 9 03:32:20 2022 -0700 HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for submitting work (#4796) Contributed by Viraj Jasani --- .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 17 +++++++++++++---- .../hadoop/fs/s3a/ITestS3APrefetchingInputStream.java | 9 +++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index dd9f3368c59..29cd158641f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -786,9 +786,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, DEFAULT_KEEPALIVE_TIME, 0); int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0; + int activeTasksForBoundedThreadPool = maxThreads; + int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads; boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - maxThreads, - maxThreads + totalTasks + numPrefetchThreads, + activeTasksForBoundedThreadPool, + waitingTasksForBoundedThreadPool, keepAliveTime, TimeUnit.SECONDS, name + "-bounded"); unboundedThreadPool = new ThreadPoolExecutor( @@ -800,8 +802,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, unboundedThreadPool.allowCoreThreadTimeOut(true); executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); - if (this.prefetchEnabled) { - this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool); + if (prefetchEnabled) { + final S3AInputStreamStatistics s3AInputStreamStatistics = + statisticsContext.newInputStreamStatistics(); + futurePool = new ExecutorServiceFuturePool( + new SemaphoredDelegatingExecutor( + boundedThreadPool, + activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool, + true, + s3AInputStreamStatistics)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 36d049cedf1..24f74b3a021 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -38,8 +38,10 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -130,6 +132,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { } // Verify that once stream is closed, all memory is freed verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -159,6 +163,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { } verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -183,6 +189,9 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0); // The buffer pool is not used verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + // no prefetch ops, so no action_executor_acquired + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org