This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a71c708d17af822cd595e59681d12ea9e5caf05c
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed Oct 19 06:38:11 2022 -0700

    HADOOP-18189 S3APrefetchingInputStream to support status probes when closed 
(#5036)
    
    
    Contributed by Viraj Jasani
---
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  1 +
 .../fs/s3a/prefetch/S3APrefetchingInputStream.java | 40 ++++++++++++----
 .../fs/s3a/ITestS3APrefetchingInputStream.java     | 54 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index be5b1799b35..4b50ab2c04b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -1164,6 +1164,7 @@ public class S3AInputStream extends FSInputStream 
implements  CanSetReadahead,
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
+  @VisibleForTesting
   public S3AInputStreamStatistics getS3AStreamStatistics() {
     return streamStatistics;
   }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
index 76ef942ed65..f778f40b74c 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
@@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
    */
   private S3ARemoteInputStream inputStream;
 
+  /**
+   * To be only used by synchronized getPos().
+   */
+  private long lastReadCurrentPos = 0;
+
+  /**
+   * To be only used by getIOStatistics().
+   */
+  private IOStatistics ioStatistics = null;
+
+  /**
+   * To be only used by getS3AStreamStatistics().
+   */
+  private S3AInputStreamStatistics inputStreamStatistics = null;
+
   /**
    * Initializes a new instance of the {@code S3APrefetchingInputStream} class.
    *
@@ -115,14 +131,20 @@ public class S3APrefetchingInputStream
   }
 
   /**
-   * Gets the current position.
+   * Gets the current position. If the underlying S3 input stream is closed,
+   * it returns last read current position from the underlying steam. If the
+   * current position was never read and the underlying input stream is closed,
+   * this would return 0.
    *
    * @return the current position.
    * @throws IOException if there is an IO error during this operation.
    */
   @Override
   public synchronized long getPos() throws IOException {
-    return isClosed() ? 0 : inputStream.getPos();
+    if (!isClosed()) {
+      lastReadCurrentPos = inputStream.getPos();
+    }
+    return lastReadCurrentPos;
   }
 
   /**
@@ -215,11 +237,12 @@ public class S3APrefetchingInputStream
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
+  @VisibleForTesting
   public S3AInputStreamStatistics getS3AStreamStatistics() {
-    if (isClosed()) {
-      return null;
+    if (!isClosed()) {
+      inputStreamStatistics = inputStream.getS3AStreamStatistics();
     }
-    return inputStream.getS3AStreamStatistics();
+    return inputStreamStatistics;
   }
 
   /**
@@ -229,10 +252,10 @@ public class S3APrefetchingInputStream
    */
   @Override
   public IOStatistics getIOStatistics() {
-    if (isClosed()) {
-      return null;
+    if (!isClosed()) {
+      ioStatistics = inputStream.getIOStatistics();
     }
-    return inputStream.getIOStatistics();
+    return ioStatistics;
   }
 
   protected boolean isClosed() {
@@ -249,7 +272,6 @@ public class S3APrefetchingInputStream
 
   @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
-    throwIfClosed();
     return false;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index 121d925d281..4fd50a3eb81 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
@@ -236,4 +238,56 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
     }
   }
 
+  @Test
+  public void testStatusProbesAfterClosingStream() throws Throwable {
+    describe("When the underlying input stream is closed, the prefetch input 
stream"
+        + " should still support some status probes");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = methodPath();
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, 
data.length, 16, true);
+
+    FSDataInputStream in = getFileSystem().open(smallFile);
+
+    byte[] buffer = new byte[SMALL_FILE_SIZE];
+    in.read(buffer, 0, S_1K * 4);
+    in.seek(S_1K * 12);
+    in.read(buffer, 0, S_1K * 4);
+
+    long pos = in.getPos();
+    IOStatistics ioStats = in.getIOStatistics();
+    S3AInputStreamStatistics inputStreamStatistics =
+        ((S3APrefetchingInputStream) 
(in.getWrappedStream())).getS3AStreamStatistics();
+
+    assertNotNull("Prefetching input IO stats should not be null", ioStats);
+    assertNotNull("Prefetching input stream stats should not be null", 
inputStreamStatistics);
+    assertNotEquals("Position retrieved from prefetching input stream should 
be greater than 0", 0,
+        pos);
+
+    in.close();
+
+    // status probes after closing the input stream
+    long newPos = in.getPos();
+    IOStatistics newIoStats = in.getIOStatistics();
+    S3AInputStreamStatistics newInputStreamStatistics =
+        ((S3APrefetchingInputStream) 
(in.getWrappedStream())).getS3AStreamStatistics();
+
+    assertNotNull("Prefetching input IO stats should not be null", newIoStats);
+    assertNotNull("Prefetching input stream stats should not be null", 
newInputStreamStatistics);
+    assertNotEquals("Position retrieved from prefetching input stream should 
be greater than 0", 0,
+        newPos);
+
+    // compare status probes after closing of the stream with status probes 
done before
+    // closing the stream
+    assertEquals("Position retrieved through stream before and after closing 
should match", pos,
+        newPos);
+    assertEquals("IO stats retrieved through stream before and after closing 
should match", ioStats,
+        newIoStats);
+    assertEquals("Stream stats retrieved through stream before and after 
closing should match",
+        inputStreamStatistics, newInputStreamStatistics);
+
+    assertFalse("seekToNewSource() not supported with prefetch", 
in.seekToNewSource(10));
+
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to