ahmarsuhail commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1283192650
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -392,16 +413,41 @@ private void readBlock(BufferData data, boolean
isPrefetch, BufferData.State...
ops.end(op);
}
- if (isPrefetch) {
- prefetchingStatistics.prefetchOperationCompleted();
- if (tracker != null) {
- tracker.close();
- }
+ // update the statistics
+ prefetchingStatistics.fetchOperationCompleted(isPrefetch,
bytesFetched);
+ if (tracker != null) {
Review Comment:
potentially could remove this null check and the one on 404 for the tracker.
it used to be null for non prefetching ops before..but won't be null anymore
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -577,11 +614,32 @@ public static Configuration
prepareTestConfiguration(final Configuration conf) {
boolean prefetchEnabled =
getTestPropertyBool(conf, PREFETCH_ENABLED_KEY,
PREFETCH_ENABLED_DEFAULT);
- conf.setBoolean(PREFETCH_ENABLED_KEY, prefetchEnabled);
+ enablePrefetch(conf, prefetchEnabled);
return conf;
}
+ /**
+ * Unset base/bucket prefetch options and set to the supplied option instead.
+ * @param conf configuration
+ * @param prefetch prefetch option
+ * @return the modified configuration.
+ */
+ public static Configuration enablePrefetch(final Configuration conf, boolean
prefetch) {
Review Comment:
nit: renaming to `setPrefetchState` or something would improve readability.
`enablePrefetch` on a glance, makes it seem like we're always enabling it
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+ private static final int BLOCK_SIZE = S_1K * 10;
+
+ private Path testFile;
+ private S3AFileSystem testFileSystem;
+ private int prefetchBlockSize;
+ private Configuration conf;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ private File tmpFileDir;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+ tmpFileDir.delete();
+ tmpFileDir.mkdirs();
+
+ conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+ String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ testFile = new Path(testFileUri);
+ testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY,
PREFETCH_BLOCK_DEFAULT_SIZE);
+ final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+ Assumptions.assumeThat(testFileStatus.getLen())
+ .describedAs("Test file %s is smaller than required size %d",
+ testFileStatus, prefetchBlockSize * 4)
+ .isGreaterThan(prefetchBlockSize);
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enablePrefetch(conf, true);
+ disableFilesystemCaching(conf);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ return conf;
+ }
+
+ @Override
+ public synchronized void teardown() throws Exception {
+ super.teardown();
+ tmpFileDir.delete();
+ File[] tmpFiles = tmpFileDir.listFiles();
+ if (tmpFiles != null) {
+ for (File filePath : tmpFiles) {
+ String path = filePath.getPath();
+ filePath.delete();
+ }
+ }
+ cleanupWithLogger(LOG, testFileSystem);
+ }
+
+ /**
+ * Test to verify the existence of the cache file.
+ * Tries to perform inputStream read and seek ops to make the prefetching
take place and
+ * asserts whether file with .bin suffix is present. It also verifies
certain file stats.
+ */
+ @Test
+ public void testCacheFileExistence() throws Throwable {
+ describe("Verify that FS cache files exist on local FS");
+ skipIfClientSideEncryption();
+
+ try (FSDataInputStream in = testFileSystem.open(testFile)) {
+ byte[] buffer = new byte[prefetchBlockSize];
+
+ in.read(buffer, 0, prefetchBlockSize - 10240);
+ assertCacheFileExists();
Review Comment:
shouldn't we add a seek and then a read here? Though I tried that locally
and the test still fails
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static org.apache.hadoop.test.Sizes.S_16K;
+import static org.apache.hadoop.test.Sizes.S_1K;
+import static org.apache.hadoop.test.Sizes.S_4K;
+
+/**
+ * Test the prefetching input stream, validates that the
+ * S3AInMemoryInputStream is working as expected.
+ */
+public class ITestInMemoryInputStream extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestInMemoryInputStream.class);
+
+ // Size should be < block size so S3AInMemoryInputStream is used
+ private static final int SMALL_FILE_SIZE = S_16K;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+ enablePrefetch(conf, true);
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ private void printStreamStatistics(final FSDataInputStream in) {
+ LOG.info("Stream statistics\n{}",
+ ioStatisticsToPrettyString(in.getIOStatistics()));
+ }
+
+ @Test
+ public void testRandomReadSmallFile() throws Throwable {
+ describe("random read on a small file, uses S3AInMemoryInputStream");
+
+ byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+ Path smallFile = path("randomReadSmallFile");
+ ContractTestUtils.writeDataset(getFileSystem(), smallFile, data,
data.length, 16, true);
+
+ int expectedReadBytes = 0;
+ try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+ IOStatistics ioStats = in.getIOStatistics();
+
+ byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+ in.read(buffer, 0, S_4K);
+ expectedReadBytes += S_4K;
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+
+ in.seek(S_1K * 12);
+ in.read(buffer, 0, S_4K);
+ expectedReadBytes += S_4K;
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+ printStreamStatistics(in);
+
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
+ // the whole file is loaded
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE,
SMALL_FILE_SIZE);
+ // there is no prefetching
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED,
0);
+ // there is no caching
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_CACHE_ENABLED, 0);
+ // no prefetch ops, so no action_executor_acquired
+ assertThatStatisticMaximum(ioStats,
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
+
+ // now read offset 0 again and again, expect no new costs
+ in.readFully(0, buffer);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ expectedReadBytes += buffer.length;
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+ // unbuffer
+ in.unbuffer();
Review Comment:
since this test is getting quite big..it might be better to have a separate
test for unbuffer
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -123,19 +142,34 @@ public void testReadLargeFileFully() throws Throwable {
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
+ printStreamStatistics(in);
// Assert that first block is read synchronously, following blocks are
prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
+
+ in.unbuffer();
+ // Verify that once stream is closed, all memory is freed
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+ // prefetching is on
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED,
1);
+ // there is no caching
Review Comment:
there is caching? same comment for line 132
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+ private static final int BLOCK_SIZE = S_1K * 10;
+
+ private Path testFile;
+ private S3AFileSystem testFileSystem;
+ private int prefetchBlockSize;
+ private Configuration conf;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ private File tmpFileDir;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+ tmpFileDir.delete();
+ tmpFileDir.mkdirs();
+
+ conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+ String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ testFile = new Path(testFileUri);
+ testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY,
PREFETCH_BLOCK_DEFAULT_SIZE);
+ final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+ Assumptions.assumeThat(testFileStatus.getLen())
+ .describedAs("Test file %s is smaller than required size %d",
+ testFileStatus, prefetchBlockSize * 4)
+ .isGreaterThan(prefetchBlockSize);
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enablePrefetch(conf, true);
+ disableFilesystemCaching(conf);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ return conf;
+ }
+
+ @Override
+ public synchronized void teardown() throws Exception {
+ super.teardown();
+ tmpFileDir.delete();
+ File[] tmpFiles = tmpFileDir.listFiles();
+ if (tmpFiles != null) {
+ for (File filePath : tmpFiles) {
+ String path = filePath.getPath();
+ filePath.delete();
+ }
+ }
+ cleanupWithLogger(LOG, testFileSystem);
+ }
+
+ /**
+ * Test to verify the existence of the cache file.
+ * Tries to perform inputStream read and seek ops to make the prefetching
take place and
+ * asserts whether file with .bin suffix is present. It also verifies
certain file stats.
+ */
+ @Test
+ public void testCacheFileExistence() throws Throwable {
Review Comment:
thinking if we can also add a test to check caching gets disabled if it
takes too long....but not sure how to do it (or if it's possible)
Also a test that if it's unbuffer, it doesn't get cached
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -29,62 +29,77 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
-import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
+import static org.apache.hadoop.test.Sizes.S_10M;
+import static org.apache.hadoop.test.Sizes.S_1K;
/**
* Test the prefetching input stream, validates that the underlying
S3ACachingInputStream and
* S3AInMemoryInputStream are working as expected.
*/
-public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
-
- public ITestS3APrefetchingInputStream() {
- super(true);
- }
+public class ITestS3APrefetchingLargeFiles extends AbstractS3ACostTest {
private static final Logger LOG =
- LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);
+ LoggerFactory.getLogger(ITestS3APrefetchingLargeFiles.class);
private static final int S_500 = 512;
- private static final int S_1K = S_500 * 2;
- private static final int S_1M = S_1K * S_1K;
+
private int numBlocks;
// Size should be > block size so S3ACachingInputStream is used
private long largeFileSize;
- // Size should be < block size so S3AInMemoryInputStream is used
- private static final int SMALL_FILE_SIZE = S_1K * 9;
-
private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
- S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
- S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ removeBaseAndBucketOverrides(conf,
+ PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
private void createLargeFile() throws Exception {
Review Comment:
do you think it's worth following the same pattern as `AbstractS3ACostTest`,
which creates a huge file in a test, and then other tests assert that the file
exists. ITestInMemoryInputStream could extend it as well, and avoid creating
and tearing down the small file multiple times
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java:
##########
@@ -409,27 +455,54 @@ protected String getOffsetStr(long offset) {
return String.format("%d:%d", blockNumber, offset);
}
+ @Override
+ public synchronized void unbuffer() {
+ LOG.debug("{}: unbuffered", getName());
+ if (closeStream(true)) {
+ getS3AStreamStatistics().unbuffered();
+ }
+ }
+
+ /**
+ * Close the stream in close() or unbuffer().
+ * @param unbuffer is this an unbuffer operation?
+ * @return true if the stream was closed; false means it was already closed.
+ */
+ protected boolean closeStream(final boolean unbuffer) {
+
+ if (underlyingResourcesClosed.getAndSet(true)) {
+ return false;
+ }
+
+ if (unbuffer) {
Review Comment:
also, why just on unbuffer? shouldn't this be cleaned up on close() too?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
- super(context, s3Attributes, client, streamStatistics);
- this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
- int bufferPoolSize = this.numBlocksToPrefetch + 1;
- this.blockManager = this.createBlockManager(
- this.getContext().getFuturePool(),
- this.getReader(),
- this.getBlockData(),
- bufferPoolSize,
- conf,
- localDirAllocator);
+ super(context, s3Attributes, client, streamStatistics);
+ this.conf = conf;
+ this.localDirAllocator = localDirAllocator;
+ this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+ demandCreateBlockManager();
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})",
this.getName(),
fileSize);
+ streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,
Review Comment:
nevermind...since S3AInMemoryInputStream doesn't actually do any prefetching
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
- super(context, s3Attributes, client, streamStatistics);
- this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
- int bufferPoolSize = this.numBlocksToPrefetch + 1;
- this.blockManager = this.createBlockManager(
- this.getContext().getFuturePool(),
- this.getReader(),
- this.getBlockData(),
- bufferPoolSize,
- conf,
- localDirAllocator);
+ super(context, s3Attributes, client, streamStatistics);
+ this.conf = conf;
+ this.localDirAllocator = localDirAllocator;
+ this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+ demandCreateBlockManager();
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})",
this.getName(),
fileSize);
+ streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,
Review Comment:
why the `numBlocksToPrefetch > 0` though? numBlocksToPrefetch will always be
> 0 because in S3AFS we do `prefetchBlockCount = intOption(conf,
PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]