[ 
https://issues.apache.org/jira/browse/HADOOP-18231?focusedWorklogId=779903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779903
 ]

ASF GitHub Bot logged work on HADOOP-18231:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jun/22 12:02
            Start Date: 09/Jun/22 12:02
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893241456


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -98,6 +107,7 @@ public S3File(
     this.streamStatistics = streamStatistics;
     this.changeTracker = changeTracker;
     this.s3Objects = new IdentityHashMap<InputStream, S3Object>();

Review Comment:
   while you are there, can you change to a simple <> here 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so 
S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {

Review Comment:
    for better isolation between tests, we need that FS to *not* be cached. 
otherwise, if a test in the same JVM has already accessed this path, you may 
get its FS with its settings.
   
   use `FileSystem.createFileSystem()`, override `teardown()` and invoke 
`cleanupWithLogger(LOG, largeFileFS)` to close it if it is non null



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", 
numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, 
obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final 
long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return 
invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, 
requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String 
reason,
+      final long remaining, final S3Object requestObject, final InputStream 
inputStream) {
+
+    if (!shouldAbort && remaining > 0) {
+      try {
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inputStream.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
+          }
+          drained += count;
+        }
+        LOG.debug("Drained stream of {} bytes", drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream", uri, 
reason, e);
+        shouldAbort = true;
+      }
+    }
     Io.closeIgnoringIoException(inputStream);
-    Io.closeIgnoringIoException(obj);
+    Io.closeIgnoringIoException(requestObject);

Review Comment:
   I'm not sure which library this is. switch to our 
`IOUtils.cleanupWithLogger(lOG, inputStream, requestObject)`
   
   this will log any exceptions at debug, if ever needed



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -193,18 +203,7 @@ public InputStream openForRead(long offset, int size) 
throws IOException {
     return stream;
   }
 
-  /**
-   * Closes this stream and releases all acquired resources.
-   */
-  @Override
-  public synchronized void close() {
-    List<InputStream> streams = new 
ArrayList<InputStream>(this.s3Objects.keySet());
-    for (InputStream stream : streams) {
-      this.close(stream);
-    }
-  }
-
-  void close(InputStream inputStream) {
+  void close(InputStream inputStream, int numRemainingBytes) {

Review Comment:
   can this be private?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;

Review Comment:
   I think hadoop common test should have a version of 
   org.apache.hadoop.fs.azure.integration.Sizes which we can reference here and 
elsewhere. it uses the S_ prefix to keep checkstyle quiet.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", 
numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, 
obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final 
long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return 
invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, 
requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String 
reason,

Review Comment:
   style nit: can you split one to a line, all tagged final. there's enough 
params to justify the effort



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so 
S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;

Review Comment:
   can you name this largeFileFS to make clear why it is being used



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so 
S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    fs = largeFile.getFileSystem(getConfiguration());
+    FileStatus fileStatus = fs.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize 
> 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[(int) largeFileSize];
+
+      in.read(buffer, 0, (int) largeFileSize);
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
+    }
+  }
+
+  @Test
+  public void testRandomReadLargeFile() throws Throwable {
+    describe("random read on a large file, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[blockSize];
+
+      // Don't read the block completely so it gets cached on seek
+      in.read(buffer, 0, blockSize - _1K * 10);
+      in.seek(blockSize + _1K * 10);
+      // Backwards seek, will use cached block
+      in.seek(_1K * 5);
+      in.read();
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, 2);
+    }
+  }
+
+  @Test
+  public void testRandomReadSmallFile() throws Throwable {
+    describe("random read on a small file, uses S3InMemoryInputStream");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = path("randomReadSmallFile");
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, 
data.length, 16, true);
+
+    try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+      in.read(buffer, 0, _1K * 4);
+      in.seek(_1K * 12);
+      in.read(buffer, 0, _1K * 4);
+
+      verifyStatisticCounterValue(ioStats, 
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, 
StreamStatisticNames.STREAM_READ_OPENED, 1);
+    }
+
+  }
+}

Review Comment:
   nit, add a newline



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java:
##########
@@ -104,16 +105,18 @@
   public S3InputStream(
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
-      S3AInputStream.InputStreamCallbacks client) {
+      S3AInputStream.InputStreamCallbacks client,
+      S3AInputStreamStatistics streamStatistics) {
 
     Validate.checkNotNull(context, "context");
     Validate.checkNotNull(s3Attributes, "s3Attributes");
     Validate.checkNotNull(client, "client");
+    Validate.checkNotNull(streamStatistics, "streamStatistics");

Review Comment:
   this is one of those things we'd inevitably want to clean up. i can see a 
"steve does a cleanup" PR coming soon.... 
   If you use `Objects.requireNonNull()` you can add the check on L119 so no 
need for extra work...its where new code should be going, even if older code 
still uses the guava checkNotNull



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {

Review Comment:
   can you call this ITestS3PrefetchingInputStream? I want to rename the other 
classes too, but let's start with the new ones



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -64,6 +67,12 @@ public class S3File implements Closeable {
   // That allows us to close the object when closing the stream.
   private Map<InputStream, S3Object> s3Objects;
 
+  // uri of the object being read

Review Comment:
   nit: can you make these all javadocs, so they show up in IDEs.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+/**
+ * Test the prefetching input stream, validates that the underlying 
S3CachingInputStream and
+ * S3InMemoryInputStream are working as expected.
+ */
+public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
+
+  public ITestS3PrefetchingInputStream() {
+    super(true);
+  }
+
+  private static final int _1K = 1024;
+  // Path for file which should have length > block size so 
S3CachingInputStream is used
+  private Path largeFile;
+  private FileSystem fs;
+  private int numBlocks;
+  private int blockSize;
+  private long largeFileSize;
+  // Size should be < block size so S3InMemoryInputStream is used
+  private static final int SMALL_FILE_SIZE = _1K * 16;
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return conf;
+  }
+
+
+  private void openFS() throws IOException {
+    Configuration conf = getConfiguration();
+
+    largeFile = new Path(DEFAULT_CSVTEST_FILE);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, 
PREFETCH_BLOCK_DEFAULT_SIZE);
+    fs = largeFile.getFileSystem(getConfiguration());
+    FileStatus fileStatus = fs.getFileStatus(largeFile);
+    largeFileSize = fileStatus.getLen();
+    numBlocks = calculateNumBlocks(largeFileSize, blockSize);
+  }
+
+  private static int calculateNumBlocks(long largeFileSize, int blockSize) {
+    if (largeFileSize == 0) {
+      return 0;
+    } else {
+      return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize 
> 0 ? 1 : 0);
+    }
+  }
+
+  @Test
+  public void testReadLargeFileFully() throws Throwable {
+    describe("read a large file fully, uses S3CachingInputStream");
+    openFS();
+
+    try (FSDataInputStream in = fs.open(largeFile)) {
+      IOStatistics ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[(int) largeFileSize];

Review Comment:
   this is trouble if that test file is really big. prefer to create a smaller 
buffer and iterate through with readFully calls.
   
   now, one thing we don't check reliably, even in AbstractSTestS3AHugeFiles, 
is *are the contents of large files constant even if you read them in different 
ways*?
   
   i think we need to worry about this, though also think 
AbstractSTestS3AHugeFiles might be the place...we can build an md5 checksum on 
the upload and verify on the reads.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 779903)
    Time Spent: 3h 10m  (was: 3h)

> tests in ITestS3AInputStreamPerformance are failing 
> ----------------------------------------------------
>
>                 Key: HADOOP-18231
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18231
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Ahmar Suhail
>            Assignee: Ahmar Suhail
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The following tests are failing when prefetching is enabled:
> testRandomIORandomPolicy - expects stream to be opened 4 times (once for 
> every random read), but prefetching will only open twice. 
> testDecompressionSequential128K - expects stream to be opened once, but 
> prefetching will open once for each block the file has. landsat file used in 
> the test has size 42MB, prefetching block size = 8MB, expected open count is 
> 6.
>  testReadWithNormalPolicy - same as above. 
> testRandomIONormalPolicy - executes random IO, but with a normal policy. 
> S3AInputStream will abort the stream and change the policy, prefetching 
> handles random IO by caching blocks so doesn't do any of that. 
> testRandomReadOverBuffer - multiple assertions failing here, also depends a 
> lot on readAhead values, not very relevant for prefetching



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to