[ 
https://issues.apache.org/jira/browse/HADOOP-19027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805182#comment-17805182
 ] 

ASF GitHub Bot commented on HADOOP-19027:
-----------------------------------------

steveloughran commented on code in PR #6425:
URL: https://github.com/apache/hadoop/pull/6425#discussion_r1447492936


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -1711,4 +1715,59 @@ public static String etag(FileStatus status) {
         "Not an EtagSource: %s", status);
     return ((EtagSource) status).getEtag();
   }
+
+  /**
+   * Create an SDK client exception.
+   * @param message message
+   * @param cause nullable cause
+   * @return the exception
+   */
+  public static SdkClientException sdkClientException(
+      String message, Throwable cause) {
+    return SdkClientException.builder()
+        .message(message)
+        .cause(cause)
+        .build();
+  }
+
+  /**
+   * Create an SDK client exception using the string value of the cause
+   * as the message.
+   * @param cause nullable cause
+   * @return the exception
+   */
+  public static SdkClientException sdkClientException(
+      Throwable cause) {
+    return SdkClientException.builder()
+        .message(cause.toString())
+        .cause(cause)
+        .build();
+  }
+
+  private static final String BYTES_PREFIX = "bytes=";
+
+  /**
+   * Given a range header, split into start and end.
+   * Based on AWSRequestAnalyzer.
+   * @param rangeHeader header string
+   * @return parse range, or (-1, -1) for problems
+   */
+  public static Pair<Long, Long> requestRange(String rangeHeader) {
+    if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) {
+      String[] values = rangeHeader
+          .substring(BYTES_PREFIX.length())
+          .split("-");
+      if (values.length == 2) {
+        try {
+          long start = Long.parseUnsignedLong(values[0]);
+          long end = Long.parseUnsignedLong(values[0]);

Review Comment:
   hey, got this wrong. as it is in the production code...will fix.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java:
##########
@@ -254,12 +278,7 @@ public void testExtractInterruptedIO() throws Throwable {
                 .build()));
   }
 
-  private SdkClientException sdkClientException(String message, Throwable 
cause) {
-    return SdkClientException.builder()
-        .message(message)
-        .cause(cause)
-        .build();
-  }
+

Review Comment:
   cut this



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -171,39 +173,226 @@ public void testOpenFileShorterLength() throws Throwable 
{
   }
 
   @Test
-  public void testOpenFileLongerLength() throws Throwable {
-    // do a second read with the length declared as longer
+  public void testOpenFileLongerLengthReadFully() throws Throwable {
+    // do a read with the length declared as longer
     // than it is.
     // An EOF will be read on readFully(), -1 on a read()
 
+    final int extra = 10;
+    long longLen = fileLength + extra;
+
+
+    // assert behaviors of seeking/reading past the file length.
+    // there is no attempt at recovery.
+    verifyMetrics(() -> {
+          try (FSDataInputStream in = openFile(longLen,
+              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+            byte[] out = new byte[(int) (longLen)];
+            intercept(EOFException.class,
+                () -> in.readFully(0, out));
+            in.seek(longLen - 1);
+            assertEquals("read past real EOF on " + in,
+                -1, in.read());
+            return in.toString();
+          }
+        },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+        with(STREAM_READ_OPENED, 1 + 1));
+
+    // now on a new stream, try a full read from after the EOF
+    verifyMetrics(() -> {
+          try (FSDataInputStream in = openFile(longLen,
+              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+            byte[] out = new byte[extra];
+            intercept(EOFException.class,
+                () -> in.readFully(fileLength, out));
+            return in.toString();
+          }
+        },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+        with(STREAM_READ_OPENED, 1));
+  }
+
+  /**
+   * Open a file.
+   * @param longLen length to declare
+   * @param policy read policy
+   * @return file handle
+   */
+  private FSDataInputStream openFile(final long longLen, String policy)
+      throws Exception {
     S3AFileSystem fs = getFileSystem();
     // set a length past the actual file length
-    long longLen = fileLength + 10;
-    FSDataInputStream in3 = verifyMetrics(() ->
+    return verifyMetrics(() ->
             fs.openFile(testFile)
-                .must(FS_OPTION_OPENFILE_READ_POLICY,
-                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .must(FS_OPTION_OPENFILE_READ_POLICY, policy)
                 .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
                 .build()
                 .get(),
         always(NO_HEAD_OR_LIST));
+  }
+
+  /**
+   * Open a file with a length declared as longer than the actual file length.
+   * Validate input stream.read() semantics.
+   */
+  @Test
+  public void testReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    try (FSDataInputStream in = openFile(longLen,
+        FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+      for (int i = 0; i < fileLength; i++) {
+        Assertions.assertThat(in.read())
+            .describedAs("read() at %d", i)
+            .isEqualTo(TEXT.charAt(i));
+      }
+    }
+
+    // now open and read after the EOF; this is
+    // expected to return -1 on each read; there's a GET per call.
+    // as the counters are updated on close(), the stream must be closed
+    // within the verification clause.
+    // note how there's no attempt to alter file expected length...
+    // instead the call always goes to S3.
+    // there's no information in the exception from the SDK
+    describe("reading past the end of the file");
 
-    // assert behaviors of seeking/reading past the file length.
-    // there is no attempt at recovery.
     verifyMetrics(() -> {
-      byte[] out = new byte[(int) longLen];
-      intercept(EOFException.class,
-          () -> in3.readFully(0, out));
-      in3.seek(longLen - 1);
-      assertEquals("read past real EOF on " + in3,
-          -1, in3.read());
-      in3.close();
-      return in3.toString();
-    },
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
-        // the operation has got as far as S3
-        with(STREAM_READ_OPENED, 2));
+          try (FSDataInputStream in =
+                   openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+            for (int i = 0; i < extra; i++) {
+              final int p = fileLength + i;
+              in.seek(p);
+              Assertions.assertThat(in.read())
+                  .describedAs("read() at %d", p)
+                  .isEqualTo(-1);
+            }
+            return in.toString();
+          }
+
+        },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+  }
 
+  /**
+   * Test {@code PositionedReadable.readFully()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadFullyPastEOF() throws Throwable {
+    // now, next corner case. Do a readFully() of more bytes than the file 
length.
+    // we expect failure.
+    // this codepath does a GET to the end of the (expected) file length, and 
when
+    // that GET returns -1 from the read because the bytes returned is less 
than
+    // expected then the readFully call fails.
+    describe("PositionedReadable.readFully() past the end of the file");
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    verifyMetrics(() -> {
+          try (FSDataInputStream in =
+                   openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+            byte[] buf = new byte[(int) (longLen + 1)];
+
+            // readFully will fail
+            intercept(EOFException.class, () -> {
+              in.readFully(0, buf);
+              return in;
+            });
+            assertS3StreamClosed(in);
+            return "readFully past EOF";
+          }
+        },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+  }
+
+
+  /**
+   * Test {@code PositionedReadable#read()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+
+    describe("PositionedReadable.read() past the end of the file");
+
+    verifyMetrics(() -> {
+          try (FSDataInputStream in =
+                   openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+            byte[] buf = new byte[(int) (longLen + 1)];
+
+            // readFully will read to the end of the file
+            Assertions.assertThat(in.read(0, buf, 0, buf.length))
+                .isEqualTo(fileLength);
+            assertS3StreamOpen(in);
+
+            // now attempt to read after EOF
+            Assertions.assertThat(in.read(fileLength, buf, 0, buf.length))
+                .describedAs("PositionedReadable.read() past EOF")
+                .isEqualTo(-1);
+            // stream is closed as part of this failure
+            assertS3StreamClosed(in);
+
+            return "PositionedReadable.read()) past EOF";
+          }
+        },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+  }
+
+  /**
+   * Test Vector Read past EOF in a file.
+   */
+  @Test
+  public void testVectorReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+
+    describe("Vector read past the end of the file");
+    verifyMetrics(() -> {
+          try (FSDataInputStream in =
+                   openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+            byte[] buf = new byte[(int) (longLen + 1)];
+            ByteBuffer bb = ByteBuffer.wrap(buf);
+            final FileRange range = FileRange.createFileRange(0, longLen);
+            in.readVectored(Arrays.asList(range), (i) -> bb);
+            assertS3StreamClosed(in);
+            return "vector read past EOF";
+          }
+
+        },
+        with(Statistic.ACTION_HTTP_GET_REQUEST, 0)); // vector stats don't add 
this

Review Comment:
   needs to await result and then the count will be incremented





> S3A: S3AInputStream doesn't recover from HTTP/channel exceptions
> ----------------------------------------------------------------
>
>                 Key: HADOOP-19027
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19027
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> S3AInputStream doesn't seem to recover from Http exceptions raised through 
> HttpClient or through OpenSSL.
> * review the recovery code to make sure it is retrying enough, it looks 
> suspiciously like it doesn't
> * detect the relevant openssl, shaded httpclient and unshaded httpclient 
> exceptions, map to a standard one and treat as comms error in our retry policy
> This is not the same as the load balancer/proxy returning 443/444 which we 
> map to AWSNoResponseException. We can't reuse that as it expects to be 
> created from an 
> {{software.amazon.awssdk.awscore.exception.AwsServiceException}} exception 
> with the relevant fields...changing it could potentially be incompatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to