This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 3cc93d7bc1a4 HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs (#6465) 3cc93d7bc1a4 is described below commit 3cc93d7bc1a4bf097f3a456f2f22788a87bb6dfc Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Fri Mar 8 12:48:38 2024 +0000 HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs (#6465) Disables the new tests added in: HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions #6425 The underlying issue here is that the block prefetch code can identify when there's a mismatch between declared and actual length, and doesn't store any of the incomplete buffer. This should be addressed in HADOOP-18184. Contributed by Steve Loughran --- .../fs/s3a/performance/AbstractS3ACostTest.java | 8 ++ .../fs/s3a/performance/ITestS3AOpenCost.java | 100 +++++++++++++++------ 2 files changed, 82 insertions(+), 26 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index 0ecbe4d5b8de..b4b139ca3062 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -370,6 +370,14 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase { return expect(true, cost); } + /** + * Always run a metrics operation. + * @return a probe. + */ + protected OperationCostValidator.ExpectedProbe always() { + return OperationCostValidator.always(); + } + /** * A metric diff which must hold when the fs is keeping markers. * @param cost expected cost diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 63b25f9c8874..25ffc8fda81c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -52,6 +52,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; @@ -60,10 +62,12 @@ import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; +import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @@ -84,6 +88,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private int fileLength; + /** + * Is prefetching enabled? + */ + private boolean prefetching; + public ITestS3AOpenCost() { super(true); } @@ -111,6 +120,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = (int)testFileStatus.getLen(); + prefetching = prefetching(); } /** @@ -161,7 +171,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { @Test public void testStreamIsNotChecksummed() throws Throwable { describe("Verify that an opened stream is not checksummed"); + + // if prefetching is enabled, skip this test + assumeNoPrefetching(); S3AFileSystem fs = getFileSystem(); + // open the file try (FSDataInputStream in = verifyMetrics(() -> fs.openFile(testFile) @@ -173,12 +187,6 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { always(NO_HEAD_OR_LIST), with(STREAM_READ_OPENED, 0))) { - // if prefetching is enabled, skip this test - final InputStream wrapped = in.getWrappedStream(); - if (!(wrapped instanceof S3AInputStream)) { - skip("Not an S3AInputStream: " + wrapped); - } - // open the stream. in.read(); // now examine the innermost stream and make sure it doesn't have a checksum @@ -239,16 +247,20 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { byte[] out = new byte[(int) (longLen)]; - intercept(EOFException.class, () -> in.readFully(0, out)); + intercept(EOFException.class, () -> { + in.readFully(0, out); + return in; + }); in.seek(longLen - 1); assertEquals("read past real EOF on " + in, -1, in.read()); return in.toString(); } }, + always(), // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 1 + 1)); + probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -293,15 +305,19 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { public void testReadPastEOF() throws Throwable { // set a length past the actual file length + describe("read() up to the end of the real file"); + assumeNoPrefetching(); + final int extra = 10; int longLen = fileLength + extra; try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { for (int i = 0; i < fileLength; i++) { Assertions.assertThat(in.read()) - .describedAs("read() at %d", i) + .describedAs("read() at %d from stream %s", i, in) .isEqualTo(TEXT.charAt(i)); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); } // now open and read after the EOF; this is @@ -323,10 +339,12 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { .describedAs("read() at %d", p) .isEqualTo(-1); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); return in.toString(); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); } /** @@ -353,10 +371,12 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { return in; }); assertS3StreamClosed(in); - return "readFully past EOF"; + return "readFully past EOF with statistics" + + ioStatisticsToPrettyString(in.getIOStatistics()); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -370,6 +390,7 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { int longLen = fileLength + extra; describe("PositionedReadable.read() past the end of the file"); + assumeNoPrefetching(); verifyMetrics(() -> { try (FSDataInputStream in = @@ -388,10 +409,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { // stream is closed as part of this failure assertS3StreamClosed(in); - return "PositionedReadable.read()) past EOF"; + return "PositionedReadable.read()) past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -405,7 +427,8 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { final int extra = 10; int longLen = fileLength + extra; - describe("Vector read past the end of the file"); + describe("Vector read past the end of the file, expecting an EOFException"); + verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { @@ -420,10 +443,29 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { TimeUnit.SECONDS, range.getData()); assertS3StreamClosed(in); - return "vector read past EOF"; + return "vector read past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); + always(), + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); + } + + /** + * Probe the FS for supporting prefetching. + * @return true if the fs has prefetching enabled. + */ + private boolean prefetching() { + return getFileSystem().getConf().getBoolean( + PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); + } + + /** + * Skip the test if prefetching is enabled. + */ + private void assumeNoPrefetching(){ + if (prefetching) { + skip("Prefetching is enabled"); + } } /** @@ -431,20 +473,26 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { * @param in input stream */ private static void assertS3StreamClosed(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is open") - .isFalse(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is open: %s", s3ain) + .isFalse(); + } } /** - * Assert that the inner S3 Stream is open. + * Assert that the inner S3 Stream is closed. * @param in input stream */ private static void assertS3StreamOpen(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is closed") - .isTrue(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is closed: %s", s3ain) + .isTrue(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org