This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 3cc93d7bc1a4 HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on 
prefetch test runs (#6465)
3cc93d7bc1a4 is described below

commit 3cc93d7bc1a4bf097f3a456f2f22788a87bb6dfc
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Fri Mar 8 12:48:38 2024 +0000

    HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs 
(#6465)
    
    Disables the new tests added in:
    
    HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel 
exceptions #6425
    
    The underlying issue here is that the block prefetch code can identify
    when there's a mismatch between declared and actual length, and doesn't
    store any of the incomplete buffer.
    
    This should be addressed in HADOOP-18184.
    
    Contributed by Steve Loughran
---
 .../fs/s3a/performance/AbstractS3ACostTest.java    |   8 ++
 .../fs/s3a/performance/ITestS3AOpenCost.java       | 100 +++++++++++++++------
 2 files changed, 82 insertions(+), 26 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
index 0ecbe4d5b8de..b4b139ca3062 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
@@ -370,6 +370,14 @@ public class AbstractS3ACostTest extends 
AbstractS3ATestBase {
     return expect(true, cost);
   }
 
+  /**
+   * Always run a metrics operation.
+   * @return a probe.
+   */
+  protected OperationCostValidator.ExpectedProbe always() {
+    return OperationCostValidator.always();
+  }
+
   /**
    * A metric diff which must hold when the fs is keeping markers.
    * @param cost expected cost
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 63b25f9c8874..25ffc8fda81c 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -52,6 +52,8 @@ import static 
org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
@@ -60,10 +62,12 @@ import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
 import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
 import static 
org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static 
org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -84,6 +88,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
 
   private int fileLength;
 
+  /**
+   * Is prefetching enabled?
+   */
+  private boolean prefetching;
+
   public ITestS3AOpenCost() {
     super(true);
   }
@@ -111,6 +120,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
     fileLength = (int)testFileStatus.getLen();
+    prefetching = prefetching();
   }
 
   /**
@@ -161,7 +171,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
   @Test
   public void testStreamIsNotChecksummed() throws Throwable {
     describe("Verify that an opened stream is not checksummed");
+
+    // if prefetching is enabled, skip this test
+    assumeNoPrefetching();
     S3AFileSystem fs = getFileSystem();
+
     // open the file
     try (FSDataInputStream in = verifyMetrics(() ->
             fs.openFile(testFile)
@@ -173,12 +187,6 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
         always(NO_HEAD_OR_LIST),
         with(STREAM_READ_OPENED, 0))) {
 
-      // if prefetching is enabled, skip this test
-      final InputStream wrapped = in.getWrappedStream();
-      if (!(wrapped instanceof S3AInputStream)) {
-        skip("Not an S3AInputStream: " + wrapped);
-      }
-
       // open the stream.
       in.read();
       // now examine the innermost stream and make sure it doesn't have a 
checksum
@@ -239,16 +247,20 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
       try (FSDataInputStream in = openFile(longLen,
           FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
         byte[] out = new byte[(int) (longLen)];
-        intercept(EOFException.class, () -> in.readFully(0, out));
+        intercept(EOFException.class, () -> {
+          in.readFully(0, out);
+          return in;
+        });
         in.seek(longLen - 1);
         assertEquals("read past real EOF on " + in, -1, in.read());
         return in.toString();
       }
     },
+        always(),
         // two GET calls were made, one for readFully,
         // the second on the read() past the EOF
         // the operation has got as far as S3
-        with(STREAM_READ_OPENED, 1 + 1));
+        probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
 
     // now on a new stream, try a full read from after the EOF
     verifyMetrics(() -> {
@@ -293,15 +305,19 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
   public void testReadPastEOF() throws Throwable {
 
     // set a length past the actual file length
+    describe("read() up to the end of the real file");
+    assumeNoPrefetching();
+
     final int extra = 10;
     int longLen = fileLength + extra;
     try (FSDataInputStream in = openFile(longLen,
         FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
       for (int i = 0; i < fileLength; i++) {
         Assertions.assertThat(in.read())
-            .describedAs("read() at %d", i)
+            .describedAs("read() at %d from stream %s", i, in)
             .isEqualTo(TEXT.charAt(i));
       }
+      LOG.info("Statistics after EOF {}", 
ioStatisticsToPrettyString(in.getIOStatistics()));
     }
 
     // now open and read after the EOF; this is
@@ -323,10 +339,12 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
               .describedAs("read() at %d", p)
               .isEqualTo(-1);
         }
+        LOG.info("Statistics after EOF {}", 
ioStatisticsToPrettyString(in.getIOStatistics()));
         return in.toString();
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+        always(),
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
   }
 
   /**
@@ -353,10 +371,12 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
           return in;
         });
         assertS3StreamClosed(in);
-        return "readFully past EOF";
+        return "readFully past EOF with statistics"
+            + ioStatisticsToPrettyString(in.getIOStatistics());
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+        always(),
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no 
attempt to re-open
   }
 
   /**
@@ -370,6 +390,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     int longLen = fileLength + extra;
 
     describe("PositionedReadable.read() past the end of the file");
+    assumeNoPrefetching();
 
     verifyMetrics(() -> {
       try (FSDataInputStream in =
@@ -388,10 +409,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
         // stream is closed as part of this failure
         assertS3StreamClosed(in);
 
-        return "PositionedReadable.read()) past EOF";
+        return "PositionedReadable.read()) past EOF with " + in;
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+        always(),
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no 
attempt to re-open
   }
 
   /**
@@ -405,7 +427,8 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
     final int extra = 10;
     int longLen = fileLength + extra;
 
-    describe("Vector read past the end of the file");
+    describe("Vector read past the end of the file, expecting an 
EOFException");
+
     verifyMetrics(() -> {
       try (FSDataInputStream in =
                openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
@@ -420,10 +443,29 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
             TimeUnit.SECONDS,
             range.getData());
         assertS3StreamClosed(in);
-        return "vector read past EOF";
+        return "vector read past EOF with " + in;
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1));
+        always(),
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
+  }
+
+  /**
+   * Probe the FS for supporting prefetching.
+   * @return true if the fs has prefetching enabled.
+   */
+  private boolean prefetching()  {
+    return getFileSystem().getConf().getBoolean(
+        PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Skip the test if prefetching is enabled.
+   */
+  private void assumeNoPrefetching(){
+    if (prefetching) {
+      skip("Prefetching is enabled");
+    }
   }
 
   /**
@@ -431,20 +473,26 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest 
{
    * @param in input stream
    */
   private static void assertS3StreamClosed(final FSDataInputStream in) {
-    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
-    Assertions.assertThat(s3ain.isObjectStreamOpen())
-        .describedAs("stream is open")
-        .isFalse();
+    final InputStream wrapped = in.getWrappedStream();
+    if (wrapped instanceof S3AInputStream) {
+      S3AInputStream s3ain = (S3AInputStream) wrapped;
+      Assertions.assertThat(s3ain.isObjectStreamOpen())
+          .describedAs("stream is open: %s", s3ain)
+          .isFalse();
+    }
   }
 
   /**
-   * Assert that the inner S3 Stream is open.
+   * Assert that the inner S3 Stream is closed.
    * @param in input stream
    */
   private static void assertS3StreamOpen(final FSDataInputStream in) {
-    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
-    Assertions.assertThat(s3ain.isObjectStreamOpen())
-        .describedAs("stream is closed")
-        .isTrue();
+    final InputStream wrapped = in.getWrappedStream();
+    if (wrapped instanceof S3AInputStream) {
+      S3AInputStream s3ain = (S3AInputStream) wrapped;
+      Assertions.assertThat(s3ain.isObjectStreamOpen())
+          .describedAs("stream is closed: %s", s3ain)
+          .isTrue();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to