[ https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750807#comment-17750807 ]
ASF GitHub Bot commented on HADOOP-18184: ----------------------------------------- ahmarsuhail commented on code in PR #5832: URL: https://github.com/apache/hadoop/pull/5832#discussion_r1283192650 ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java: ########## @@ -392,16 +413,41 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... ops.end(op); } - if (isPrefetch) { - prefetchingStatistics.prefetchOperationCompleted(); - if (tracker != null) { - tracker.close(); - } + // update the statistics + prefetchingStatistics.fetchOperationCompleted(isPrefetch, bytesFetched); + if (tracker != null) { Review Comment: potentially could remove this null check and the one on 404 for the tracker. it used to be null for non prefetching ops before..but won't be null anymore ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java: ########## @@ -577,11 +614,32 @@ public static Configuration prepareTestConfiguration(final Configuration conf) { boolean prefetchEnabled = getTestPropertyBool(conf, PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); - conf.setBoolean(PREFETCH_ENABLED_KEY, prefetchEnabled); + enablePrefetch(conf, prefetchEnabled); return conf; } + /** + * Unset base/bucket prefetch options and set to the supplied option instead. + * @param conf configuration + * @param prefetch prefetch option + * @return the modified configuration. + */ + public static Configuration enablePrefetch(final Configuration conf, boolean prefetch) { Review Comment: nit: renaming to `setPrefetchState` or something would improve readability. `enablePrefetch` on a glance, makes it seem like we're always enabling it ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.prefetch; + +import java.io.File; +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; + +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch; +import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.test.Sizes.S_1K; + +/** + * Test the cache file behaviour with prefetching input stream. + */ +public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class); + private static final int BLOCK_SIZE = S_1K * 10; + + private Path testFile; + private S3AFileSystem testFileSystem; + private int prefetchBlockSize; + private Configuration conf; + + /** + * Thread level IOStatistics; reset in setup(). + */ + private IOStatisticsContext ioStatisticsContext; + + private File tmpFileDir; + + + @Before + public void setUp() throws Exception { + super.setup(); + conf = createConfiguration(); + tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", ""); + tmpFileDir.delete(); + tmpFileDir.mkdirs(); + + conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath()); + String testFileUri = S3ATestUtils.getCSVTestFile(conf); + + testFile = new Path(testFileUri); + testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); + + prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile); + Assumptions.assumeThat(testFileStatus.getLen()) + .describedAs("Test file %s is smaller than required size %d", + testFileStatus, prefetchBlockSize * 4) + .isGreaterThan(prefetchBlockSize); + + ioStatisticsContext = getCurrentIOStatisticsContext(); + ioStatisticsContext.reset(); + } + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + enablePrefetch(conf, true); + disableFilesystemCaching(conf); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); + conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); + return conf; + } + + @Override + public synchronized void teardown() throws Exception { + super.teardown(); + tmpFileDir.delete(); + File[] tmpFiles = tmpFileDir.listFiles(); + if (tmpFiles != null) { + for (File filePath : tmpFiles) { + String path = filePath.getPath(); + filePath.delete(); + } + } + cleanupWithLogger(LOG, testFileSystem); + } + + /** + * Test to verify the existence of the cache file. + * Tries to perform inputStream read and seek ops to make the prefetching take place and + * asserts whether file with .bin suffix is present. It also verifies certain file stats. + */ + @Test + public void testCacheFileExistence() throws Throwable { + describe("Verify that FS cache files exist on local FS"); + skipIfClientSideEncryption(); + + try (FSDataInputStream in = testFileSystem.open(testFile)) { + byte[] buffer = new byte[prefetchBlockSize]; + + in.read(buffer, 0, prefetchBlockSize - 10240); + assertCacheFileExists(); Review Comment: shouldn't we add a seek and then a read here? Though I tried that locally and the test still fails ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.prefetch; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; + +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; +import static org.apache.hadoop.test.Sizes.S_16K; +import static org.apache.hadoop.test.Sizes.S_1K; +import static org.apache.hadoop.test.Sizes.S_4K; + +/** + * Test the prefetching input stream, validates that the + * S3AInMemoryInputStream is working as expected. + */ +public class ITestInMemoryInputStream extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestInMemoryInputStream.class); + + // Size should be < block size so S3AInMemoryInputStream is used + private static final int SMALL_FILE_SIZE = S_16K; + + /** + * Thread level IOStatistics; reset in setup(). + */ + private IOStatisticsContext ioStatisticsContext; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + enablePrefetch(conf, true); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + ioStatisticsContext = getCurrentIOStatisticsContext(); + ioStatisticsContext.reset(); + } + + private void printStreamStatistics(final FSDataInputStream in) { + LOG.info("Stream statistics\n{}", + ioStatisticsToPrettyString(in.getIOStatistics())); + } + + @Test + public void testRandomReadSmallFile() throws Throwable { + describe("random read on a small file, uses S3AInMemoryInputStream"); + + byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26); + Path smallFile = path("randomReadSmallFile"); + ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true); + + int expectedReadBytes = 0; + try (FSDataInputStream in = getFileSystem().open(smallFile)) { + IOStatistics ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[SMALL_FILE_SIZE]; + + in.read(buffer, 0, S_4K); + expectedReadBytes += S_4K; + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, + expectedReadBytes); + + in.seek(S_1K * 12); + in.read(buffer, 0, S_4K); + expectedReadBytes += S_4K; + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, + expectedReadBytes); + printStreamStatistics(in); + + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0); + // the whole file is loaded + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, SMALL_FILE_SIZE); + // there is no prefetching + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED, 0); + // there is no caching + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_CACHE_ENABLED, 0); + // no prefetch ops, so no action_executor_acquired + assertThatStatisticMaximum(ioStats, + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); + + // now read offset 0 again and again, expect no new costs + in.readFully(0, buffer); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + expectedReadBytes += buffer.length; + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, + expectedReadBytes); + // unbuffer + in.unbuffer(); Review Comment: since this test is getting quite big..it might be better to have a separate test for unbuffer ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java: ########## @@ -123,19 +142,34 @@ public void testReadLargeFileFully() throws Throwable { verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); } + printStreamStatistics(in); // Assert that first block is read synchronously, following blocks are prefetched verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, numBlocks - 1); verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); + + in.unbuffer(); + // Verify that once stream is closed, all memory is freed + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + // prefetching is on + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED, 1); + // there is no caching Review Comment: there is caching? same comment for line 132 ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.prefetch; + +import java.io.File; +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; + +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch; +import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.test.Sizes.S_1K; + +/** + * Test the cache file behaviour with prefetching input stream. + */ +public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class); + private static final int BLOCK_SIZE = S_1K * 10; + + private Path testFile; + private S3AFileSystem testFileSystem; + private int prefetchBlockSize; + private Configuration conf; + + /** + * Thread level IOStatistics; reset in setup(). + */ + private IOStatisticsContext ioStatisticsContext; + + private File tmpFileDir; + + + @Before + public void setUp() throws Exception { + super.setup(); + conf = createConfiguration(); + tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", ""); + tmpFileDir.delete(); + tmpFileDir.mkdirs(); + + conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath()); + String testFileUri = S3ATestUtils.getCSVTestFile(conf); + + testFile = new Path(testFileUri); + testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); + + prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); + final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile); + Assumptions.assumeThat(testFileStatus.getLen()) + .describedAs("Test file %s is smaller than required size %d", + testFileStatus, prefetchBlockSize * 4) + .isGreaterThan(prefetchBlockSize); + + ioStatisticsContext = getCurrentIOStatisticsContext(); + ioStatisticsContext.reset(); + } + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + enablePrefetch(conf, true); + disableFilesystemCaching(conf); + S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); + conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); + return conf; + } + + @Override + public synchronized void teardown() throws Exception { + super.teardown(); + tmpFileDir.delete(); + File[] tmpFiles = tmpFileDir.listFiles(); + if (tmpFiles != null) { + for (File filePath : tmpFiles) { + String path = filePath.getPath(); + filePath.delete(); + } + } + cleanupWithLogger(LOG, testFileSystem); + } + + /** + * Test to verify the existence of the cache file. + * Tries to perform inputStream read and seek ops to make the prefetching take place and + * asserts whether file with .bin suffix is present. It also verifies certain file stats. + */ + @Test + public void testCacheFileExistence() throws Throwable { Review Comment: thinking if we can also add a test to check caching gets disabled if it takes too long....but not sure how to do it (or if it's possible) Also a test that if it's unbuffer, it doesn't get cached ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java: ########## @@ -29,62 +29,77 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; -import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ; +import static org.apache.hadoop.test.Sizes.S_10M; +import static org.apache.hadoop.test.Sizes.S_1K; /** * Test the prefetching input stream, validates that the underlying S3ACachingInputStream and * S3AInMemoryInputStream are working as expected. */ -public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { - - public ITestS3APrefetchingInputStream() { - super(true); - } +public class ITestS3APrefetchingLargeFiles extends AbstractS3ACostTest { private static final Logger LOG = - LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class); + LoggerFactory.getLogger(ITestS3APrefetchingLargeFiles.class); private static final int S_500 = 512; - private static final int S_1K = S_500 * 2; - private static final int S_1M = S_1K * S_1K; + private int numBlocks; // Size should be > block size so S3ACachingInputStream is used private long largeFileSize; - // Size should be < block size so S3AInMemoryInputStream is used - private static final int SMALL_FILE_SIZE = S_1K * 9; - private static final int TIMEOUT_MILLIS = 5000; private static final int INTERVAL_MILLIS = 500; private static final int BLOCK_SIZE = S_1K * 10; + /** + * Thread level IOStatistics; reset in setup(). + */ + private IOStatisticsContext ioStatisticsContext; + @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); - S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); - S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); + removeBaseAndBucketOverrides(conf, + PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY); conf.setBoolean(PREFETCH_ENABLED_KEY, true); conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); return conf; } + @Override + public void setup() throws Exception { + super.setup(); + + ioStatisticsContext = getCurrentIOStatisticsContext(); + ioStatisticsContext.reset(); + } + private void createLargeFile() throws Exception { Review Comment: do you think it's worth following the same pattern as `AbstractS3ACostTest`, which creates a huge file in a test, and then other tests assert that the file exists. ITestInMemoryInputStream could extend it as well, and avoid creating and tearing down the small file multiple times ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java: ########## @@ -409,27 +455,54 @@ protected String getOffsetStr(long offset) { return String.format("%d:%d", blockNumber, offset); } + @Override + public synchronized void unbuffer() { + LOG.debug("{}: unbuffered", getName()); + if (closeStream(true)) { + getS3AStreamStatistics().unbuffered(); + } + } + + /** + * Close the stream in close() or unbuffer(). + * @param unbuffer is this an unbuffer operation? + * @return true if the stream was closed; false means it was already closed. + */ + protected boolean closeStream(final boolean unbuffer) { + + if (underlyingResourcesClosed.getAndSet(true)) { + return false; + } + + if (unbuffer) { Review Comment: also, why just on unbuffer? shouldn't this be cleaned up on close() too? ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java: ########## @@ -76,36 +79,75 @@ public S3ACachingInputStream( S3AInputStreamStatistics streamStatistics, Configuration conf, LocalDirAllocator localDirAllocator) { - super(context, s3Attributes, client, streamStatistics); - this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); - int bufferPoolSize = this.numBlocksToPrefetch + 1; - this.blockManager = this.createBlockManager( - this.getContext().getFuturePool(), - this.getReader(), - this.getBlockData(), - bufferPoolSize, - conf, - localDirAllocator); + super(context, s3Attributes, client, streamStatistics); + this.conf = conf; + this.localDirAllocator = localDirAllocator; + this.numBlocksToPrefetch = getContext().getPrefetchBlockCount(); + demandCreateBlockManager(); int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); + streamStatistics.setPrefetchState(numBlocksToPrefetch > 0, Review Comment: nevermind...since S3AInMemoryInputStream doesn't actually do any prefetching ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java: ########## @@ -76,36 +79,75 @@ public S3ACachingInputStream( S3AInputStreamStatistics streamStatistics, Configuration conf, LocalDirAllocator localDirAllocator) { - super(context, s3Attributes, client, streamStatistics); - this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); - int bufferPoolSize = this.numBlocksToPrefetch + 1; - this.blockManager = this.createBlockManager( - this.getContext().getFuturePool(), - this.getReader(), - this.getBlockData(), - bufferPoolSize, - conf, - localDirAllocator); + super(context, s3Attributes, client, streamStatistics); + this.conf = conf; + this.localDirAllocator = localDirAllocator; + this.numBlocksToPrefetch = getContext().getPrefetchBlockCount(); + demandCreateBlockManager(); int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); + streamStatistics.setPrefetchState(numBlocksToPrefetch > 0, Review Comment: why the `numBlocksToPrefetch > 0` though? numBlocksToPrefetch will always be > 0 because in S3AFS we do `prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);` > s3a prefetching stream to support unbuffer() > -------------------------------------------- > > Key: HADOOP-18184 > URL: https://issues.apache.org/jira/browse/HADOOP-18184 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.4.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Minor > Labels: pull-request-available > > Apache Impala uses unbuffer() to free up all client side resources held by a > stream, so allowing it to have a map of available (path -> stream) objects, > retained across queries. > This saves on having to reopen the files, with the cost of HEAD checks etc. > S3AInputStream just closes its http connection. here there is a lot more > state to discard, but all memory and file storage must be freed. > until this done, ITestS3AContractUnbuffer must skip when the prefetch stream > is used. > its notable that the other tests don't fail, even though the stream doesn't > implement the interface; the graceful degradation handles that. it should > fail if the test xml resource says the stream does it, but that the stream > capabilities say it doesn't. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org