This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new c67c2b75690 HADOOP-18546. ABFS. disable purging list of in progress reads in abfs stream close() (#5176) c67c2b75690 is described below commit c67c2b756907af2d7167a32bdb2756ca18fd3960 Author: Pranav Saxena <108325433+pranavsaxena-micros...@users.noreply.github.com> AuthorDate: Wed Dec 7 12:15:45 2022 -0800 HADOOP-18546. ABFS. disable purging list of in progress reads in abfs stream close() (#5176) This addresses HADOOP-18521, "ABFS ReadBufferManager buffer sharing across concurrent HTTP requests" by not trying to cancel in progress reads. It supercedes HADOOP-18528, which disables the prefetching. If that patch is applied *after* this one, prefetching will be disabled. As well as changing the default value in the code, core-default.xml is updated to set fs.azure.enable.readahead = true As a result, if Configuration.get("fs.azure.enable.readahead") returns a non-null value, then it can be inferred that it was set in or core-default.xml (the fix is present) or in core-site.xml (someone asked for it). Contributed by Pranav Saxena. --- .../src/main/resources/core-default.xml | 5 +- .../constants/FileSystemConfigurations.java | 2 +- .../azurebfs/services/AbfsInputStreamContext.java | 2 +- .../fs/azurebfs/services/ReadBufferManager.java | 6 +- .../contract/ITestAbfsFileSystemContractSeek.java | 2 - .../fs/azurebfs/services/TestAbfsInputStream.java | 71 +++++++++++++++++++++- 6 files changed, 78 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 047c5482062..e18a50c72e8 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2168,9 +2168,8 @@ The switch to turn S3A auditing on or off. <property> <name>fs.azure.enable.readahead</name> - <value>false</value> - <description>Disable readahead/prefetching in AbfsInputStream. - See HADOOP-18521</description> + <value>true</value> + <description>Enabled readahead/prefetching in AbfsInputStream.</description> </property> <property> diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 0ea2c929800..9994d9f5207 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -109,7 +109,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; - public static final boolean DEFAULT_ENABLE_READAHEAD = false; + public static final boolean DEFAULT_ENABLE_READAHEAD = true; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f6b330934cf..e258958b1a1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; - private boolean isReadAheadEnabled = false; + private boolean isReadAheadEnabled = true; private boolean alwaysReadBufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 317aaf545a1..ac84f0b27cf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -544,7 +544,6 @@ final class ReadBufferManager { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); purgeList(stream, completedReadList); - purgeList(stream, inProgressList); } /** @@ -642,4 +641,9 @@ final class ReadBufferManager { freeList.clear(); completedReadList.add(buf); } + + @VisibleForTesting + int getNumBuffers() { + return NUM_BUFFERS; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index aaf47f7a9c8..f7fe5039799 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; @@ -69,7 +68,6 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{ protected AbstractFSContract createContract(final Configuration conf) { conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE); conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); - conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true); return new AbfsFileSystemContract(conf, isSecure); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 69795ee5bd8..0395c4183b9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -82,6 +82,12 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @Override + public void teardown() throws Exception { + super.teardown(); + ReadBufferManager.getBufferManager().testResetReadBufferManager(); + } + private AbfsRestOperation getMockRestOp() { AbfsRestOperation op = mock(AbfsRestOperation.class); AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); @@ -106,7 +112,6 @@ public class TestAbfsInputStream extends private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); - inputStreamContext.isReadAheadEnabled(true); // Create AbfsInputStream with the client instance AbfsInputStream inputStream = new AbfsInputStream( mockAbfsClient, @@ -132,7 +137,6 @@ public class TestAbfsInputStream extends boolean alwaysReadBufferSize, int readAheadBlockSize) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); - inputStreamContext.isReadAheadEnabled(true); // Create AbfsInputStream with the client instance AbfsInputStream inputStream = new AbfsInputStream( abfsClient, @@ -495,6 +499,69 @@ public class TestAbfsInputStream extends checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + final Long serverCommunicationMockLatency = 3_000L; + final Long readBufferTransferToInProgressProbableTime = 1_000L; + final Integer readBufferQueuedCount = 3; + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(serverCommunicationMockLatency); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + final int readBufferTotal = readBufferManager.getNumBuffers(); + final int expectedFreeListBufferCount = readBufferTotal + - readBufferQueuedCount; + + try (AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt")) { + // As this is try-with-resources block, the close() method of the created + // abfsInputStream object shall be called on the end of the block. + queueReadAheads(inputStream); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(readBufferTransferToInProgressProbableTime); + + Assertions.assertThat(readBufferManager.getInProgressCopiedList()) + .describedAs(String.format("InProgressList should have %d elements", + readBufferQueuedCount)) + .hasSize(readBufferQueuedCount); + Assertions.assertThat(readBufferManager.getFreeListCopy()) + .describedAs(String.format("FreeList should have %d elements", + expectedFreeListBufferCount)) + .hasSize(expectedFreeListBufferCount); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy()) + .describedAs("CompletedList should have 0 elements") + .hasSize(0); + } + + Assertions.assertThat(readBufferManager.getInProgressCopiedList()) + .describedAs(String.format("InProgressList should have %d elements", + readBufferQueuedCount)) + .hasSize(readBufferQueuedCount); + Assertions.assertThat(readBufferManager.getFreeListCopy()) + .describedAs(String.format("FreeList should have %d elements", + expectedFreeListBufferCount)) + .hasSize(expectedFreeListBufferCount); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy()) + .describedAs("CompletedList should have 0 elements") + .hasSize(0); + } + /** * This test expects ReadAheadManager to throw exception if the read ahead * thread had failed within the last thresholdAgeMilliseconds. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org