[ 
https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750807#comment-17750807
 ] 

ASF GitHub Bot commented on HADOOP-18184:
-----------------------------------------

ahmarsuhail commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1283192650


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -392,16 +413,41 @@ private void readBlock(BufferData data, boolean 
isPrefetch, BufferData.State...
           ops.end(op);
         }
 
-        if (isPrefetch) {
-          prefetchingStatistics.prefetchOperationCompleted();
-          if (tracker != null) {
-            tracker.close();
-          }
+        // update the statistics
+        prefetchingStatistics.fetchOperationCompleted(isPrefetch, 
bytesFetched);
+        if (tracker != null) {

Review Comment:
   potentially could remove this null check and the one on 404 for the tracker. 
it used to be null for non prefetching ops before..but won't be null anymore 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -577,11 +614,32 @@ public static Configuration 
prepareTestConfiguration(final Configuration conf) {
 
     boolean prefetchEnabled =
         getTestPropertyBool(conf, PREFETCH_ENABLED_KEY, 
PREFETCH_ENABLED_DEFAULT);
-    conf.setBoolean(PREFETCH_ENABLED_KEY, prefetchEnabled);
+    enablePrefetch(conf, prefetchEnabled);
 
     return conf;
   }
 
+  /**
+   * Unset base/bucket prefetch options and set to the supplied option instead.
+   * @param conf configuration
+   * @param prefetch prefetch option
+   * @return the modified configuration.
+   */
+  public static Configuration enablePrefetch(final Configuration conf, boolean 
prefetch) {

Review Comment:
   nit: renaming to `setPrefetchState` or something would improve readability. 
`enablePrefetch` on a glance, makes it seem like we're always enabling it



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+  private static final int BLOCK_SIZE = S_1K * 10;
+
+  private Path testFile;
+  private S3AFileSystem testFileSystem;
+  private int prefetchBlockSize;
+  private Configuration conf;
+
+  /**
+   * Thread level IOStatistics; reset in setup().
+   */
+  private IOStatisticsContext ioStatisticsContext;
+
+  private File tmpFileDir;
+
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    conf = createConfiguration();
+    tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+    tmpFileDir.delete();
+    tmpFileDir.mkdirs();
+
+    conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+    String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+    testFile = new Path(testFileUri);
+    testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+    prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+    Assumptions.assumeThat(testFileStatus.getLen())
+        .describedAs("Test file %s is smaller than required size %d",
+            testFileStatus, prefetchBlockSize * 4)
+        .isGreaterThan(prefetchBlockSize);
+
+    ioStatisticsContext = getCurrentIOStatisticsContext();
+    ioStatisticsContext.reset();
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    enablePrefetch(conf, true);
+    disableFilesystemCaching(conf);
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+    conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    return conf;
+  }
+
+  @Override
+  public synchronized void teardown() throws Exception {
+    super.teardown();
+    tmpFileDir.delete();
+    File[] tmpFiles = tmpFileDir.listFiles();
+    if (tmpFiles != null) {
+      for (File filePath : tmpFiles) {
+        String path = filePath.getPath();
+        filePath.delete();
+      }
+    }
+    cleanupWithLogger(LOG, testFileSystem);
+  }
+
+  /**
+   * Test to verify the existence of the cache file.
+   * Tries to perform inputStream read and seek ops to make the prefetching 
take place and
+   * asserts whether file with .bin suffix is present. It also verifies 
certain file stats.
+   */
+  @Test
+  public void testCacheFileExistence() throws Throwable {
+    describe("Verify that FS cache files exist on local FS");
+    skipIfClientSideEncryption();
+
+    try (FSDataInputStream in = testFileSystem.open(testFile)) {
+      byte[] buffer = new byte[prefetchBlockSize];
+
+      in.read(buffer, 0, prefetchBlockSize - 10240);
+      assertCacheFileExists();

Review Comment:
   shouldn't we add a seek and then a read here? Though I tried that locally 
and the test still fails



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static org.apache.hadoop.test.Sizes.S_16K;
+import static org.apache.hadoop.test.Sizes.S_1K;
+import static org.apache.hadoop.test.Sizes.S_4K;
+
+/**
+ * Test the prefetching input stream, validates that the
+ * S3AInMemoryInputStream is working as expected.
+ */
+public class ITestInMemoryInputStream extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestInMemoryInputStream.class);
+
+  // Size should be < block size so S3AInMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = S_16K;
+
+  /**
+   * Thread level IOStatistics; reset in setup().
+   */
+  private IOStatisticsContext ioStatisticsContext;
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    enablePrefetch(conf, true);
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    ioStatisticsContext = getCurrentIOStatisticsContext();
+    ioStatisticsContext.reset();
+  }
+
+  private void printStreamStatistics(final FSDataInputStream in) {
+    LOG.info("Stream statistics\n{}",
+        ioStatisticsToPrettyString(in.getIOStatistics()));
+  }
+
+  @Test
+  public void testRandomReadSmallFile() throws Throwable {
+    describe("random read on a small file, uses S3AInMemoryInputStream");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = path("randomReadSmallFile");
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, 
data.length, 16, true);
+
+    int expectedReadBytes = 0;
+    try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+      in.read(buffer, 0, S_4K);
+      expectedReadBytes += S_4K;
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+          expectedReadBytes);
+
+      in.seek(S_1K * 12);
+      in.read(buffer, 0, S_4K);
+      expectedReadBytes += S_4K;
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+          expectedReadBytes);
+      printStreamStatistics(in);
+
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
+      // the whole file is loaded
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 
SMALL_FILE_SIZE);
+      // there is no prefetching
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED, 
0);
+      // there is no caching
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_CACHE_ENABLED, 0);
+      // no prefetch ops, so no action_executor_acquired
+      assertThatStatisticMaximum(ioStats,
+          ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
+
+      // now read offset 0 again and again, expect no new costs
+      in.readFully(0, buffer);
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+      expectedReadBytes += buffer.length;
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+          expectedReadBytes);
+      // unbuffer
+      in.unbuffer();

Review Comment:
   since this test is getting quite big..it might be better to have a separate 
test for unbuffer



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -123,19 +142,34 @@ public void testReadLargeFileFully() throws Throwable {
         verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
             0);
       }
+      printStreamStatistics(in);
 
       // Assert that first block is read synchronously, following blocks are 
prefetched
       verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
           numBlocks - 1);
       verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
       verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
+
+      in.unbuffer();
+      // Verify that once stream is closed, all memory is freed
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+      // prefetching is on
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED, 
1);
+      // there is no caching

Review Comment:
   there is caching? same comment for line 132



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+  private static final int BLOCK_SIZE = S_1K * 10;
+
+  private Path testFile;
+  private S3AFileSystem testFileSystem;
+  private int prefetchBlockSize;
+  private Configuration conf;
+
+  /**
+   * Thread level IOStatistics; reset in setup().
+   */
+  private IOStatisticsContext ioStatisticsContext;
+
+  private File tmpFileDir;
+
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    conf = createConfiguration();
+    tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+    tmpFileDir.delete();
+    tmpFileDir.mkdirs();
+
+    conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+    String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+    testFile = new Path(testFileUri);
+    testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+    prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+    Assumptions.assumeThat(testFileStatus.getLen())
+        .describedAs("Test file %s is smaller than required size %d",
+            testFileStatus, prefetchBlockSize * 4)
+        .isGreaterThan(prefetchBlockSize);
+
+    ioStatisticsContext = getCurrentIOStatisticsContext();
+    ioStatisticsContext.reset();
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    enablePrefetch(conf, true);
+    disableFilesystemCaching(conf);
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+    conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    return conf;
+  }
+
+  @Override
+  public synchronized void teardown() throws Exception {
+    super.teardown();
+    tmpFileDir.delete();
+    File[] tmpFiles = tmpFileDir.listFiles();
+    if (tmpFiles != null) {
+      for (File filePath : tmpFiles) {
+        String path = filePath.getPath();
+        filePath.delete();
+      }
+    }
+    cleanupWithLogger(LOG, testFileSystem);
+  }
+
+  /**
+   * Test to verify the existence of the cache file.
+   * Tries to perform inputStream read and seek ops to make the prefetching 
take place and
+   * asserts whether file with .bin suffix is present. It also verifies 
certain file stats.
+   */
+  @Test
+  public void testCacheFileExistence() throws Throwable {

Review Comment:
   thinking if we can also add a test to check caching gets disabled if it 
takes too long....but not sure how to do it (or if it's possible)
   
   Also a test that if it's unbuffer, it doesn't get cached



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -29,62 +29,77 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
-import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
+import static org.apache.hadoop.test.Sizes.S_10M;
+import static org.apache.hadoop.test.Sizes.S_1K;
 
 /**
  * Test the prefetching input stream, validates that the underlying 
S3ACachingInputStream and
  * S3AInMemoryInputStream are working as expected.
  */
-public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
-
-  public ITestS3APrefetchingInputStream() {
-    super(true);
-  }
+public class ITestS3APrefetchingLargeFiles extends AbstractS3ACostTest {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);
+      LoggerFactory.getLogger(ITestS3APrefetchingLargeFiles.class);
 
   private static final int S_500 = 512;
-  private static final int S_1K = S_500 * 2;
-  private static final int S_1M = S_1K * S_1K;
+
   private int numBlocks;
 
   // Size should be > block size so S3ACachingInputStream is used
   private long largeFileSize;
 
-  // Size should be < block size so S3AInMemoryInputStream is used
-  private static final int SMALL_FILE_SIZE = S_1K * 9;
-
   private static final int TIMEOUT_MILLIS = 5000;
   private static final int INTERVAL_MILLIS = 500;
   private static final int BLOCK_SIZE = S_1K * 10;
 
+  /**
+   * Thread level IOStatistics; reset in setup().
+   */
+  private IOStatisticsContext ioStatisticsContext;
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
-    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
-    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+    removeBaseAndBucketOverrides(conf,
+        PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY);
     conf.setBoolean(PREFETCH_ENABLED_KEY, true);
     conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
     return conf;
   }
 
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    ioStatisticsContext = getCurrentIOStatisticsContext();
+    ioStatisticsContext.reset();
+  }
+
   private void createLargeFile() throws Exception {

Review Comment:
   do you think it's worth following the same pattern as `AbstractS3ACostTest`, 
which creates a huge file in a test, and then other tests assert that the file 
exists. ITestInMemoryInputStream could extend it as well, and avoid creating  
and tearing down the small file multiple times 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java:
##########
@@ -409,27 +455,54 @@ protected String getOffsetStr(long offset) {
     return String.format("%d:%d", blockNumber, offset);
   }
 
+  @Override
+  public synchronized void unbuffer() {
+    LOG.debug("{}: unbuffered", getName());
+    if (closeStream(true)) {
+      getS3AStreamStatistics().unbuffered();
+    }
+  }
+
+  /**
+   * Close the stream in close() or unbuffer().
+   * @param unbuffer is this an unbuffer operation?
+   * @return true if the stream was closed; false means it was already closed.
+   */
+  protected boolean closeStream(final boolean unbuffer) {
+
+    if (underlyingResourcesClosed.getAndSet(true)) {
+      return false;
+    }
+
+    if (unbuffer) {

Review Comment:
   also, why just on unbuffer? shouldn't this be cleaned up on close() too?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
       S3AInputStreamStatistics streamStatistics,
       Configuration conf,
       LocalDirAllocator localDirAllocator) {
-    super(context, s3Attributes, client, streamStatistics);
 
-    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
-    int bufferPoolSize = this.numBlocksToPrefetch + 1;
-    this.blockManager = this.createBlockManager(
-        this.getContext().getFuturePool(),
-        this.getReader(),
-        this.getBlockData(),
-        bufferPoolSize,
-        conf,
-        localDirAllocator);
+    super(context, s3Attributes, client, streamStatistics);
+    this.conf = conf;
+    this.localDirAllocator = localDirAllocator;
+    this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+    demandCreateBlockManager();
     int fileSize = (int) s3Attributes.getLen();
     LOG.debug("Created caching input stream for {} (size = {})", 
this.getName(),
         fileSize);
+    streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,

Review Comment:
   nevermind...since S3AInMemoryInputStream doesn't actually do any prefetching



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
       S3AInputStreamStatistics streamStatistics,
       Configuration conf,
       LocalDirAllocator localDirAllocator) {
-    super(context, s3Attributes, client, streamStatistics);
 
-    this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
-    int bufferPoolSize = this.numBlocksToPrefetch + 1;
-    this.blockManager = this.createBlockManager(
-        this.getContext().getFuturePool(),
-        this.getReader(),
-        this.getBlockData(),
-        bufferPoolSize,
-        conf,
-        localDirAllocator);
+    super(context, s3Attributes, client, streamStatistics);
+    this.conf = conf;
+    this.localDirAllocator = localDirAllocator;
+    this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+    demandCreateBlockManager();
     int fileSize = (int) s3Attributes.getLen();
     LOG.debug("Created caching input stream for {} (size = {})", 
this.getName(),
         fileSize);
+    streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,

Review Comment:
   why the `numBlocksToPrefetch > 0` though? numBlocksToPrefetch will always be 
> 0 because in S3AFS we do `prefetchBlockCount = intOption(conf, 
PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);`





> s3a prefetching stream to support unbuffer()
> --------------------------------------------
>
>                 Key: HADOOP-18184
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18184
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Minor
>              Labels: pull-request-available
>
> Apache Impala uses unbuffer() to free up all client side resources held by a 
> stream, so allowing it to have a map of available (path -> stream) objects, 
> retained across queries.
> This saves on having to reopen the files, with the cost of HEAD checks etc. 
> S3AInputStream just closes its http connection. here there is a lot more 
> state to discard, but all memory and file storage must be freed.
> until this done, ITestS3AContractUnbuffer must skip when the prefetch stream 
> is used.
> its notable that the other tests don't fail, even though the stream doesn't 
> implement the interface; the graceful degradation handles that. it should 
> fail if the test xml resource says the stream does it, but that the stream 
> capabilities say it doesn't.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to