[
https://issues.apache.org/jira/browse/HADOOP-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642112#comment-17642112
]
ASF GitHub Bot commented on HADOOP-18546:
-----------------------------------------
steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 *
ONE_MB;
+ @After
+ public void afterTest() throws InterruptedException {
Review Comment:
override `teardown()`, call superclass. that way you know the order of
things happening
##########
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml:
##########
@@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off.
<description>The AbstractFileSystem for gs: uris.</description>
</property>
-<property>
- <name>fs.azure.enable.readahead</name>
- <value>false</value>
Review Comment:
retain but set to true. why so? storediag will log it and so show that
someone has explicitly said "readahead is safe here"
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 *
ONE_MB;
+ @After
+ public void afterTest() throws InterruptedException {
+ //thread wait so that previous test's inProgress buffers are processed and
removed.
+ Thread.sleep(10000l);
Review Comment:
don't like this as it potentially ladds 10s to a test run, one which could
still be a bit flaky.
what about using `testResetReadBufferManager()`?
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(3000l);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(1000l);
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+ Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+
+ inputStream.close();
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
Review Comment:
use .hasSize(13)
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(3000l);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(1000l);
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+ Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+
+ inputStream.close();
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+
+ //Sleep so that response from mockedClient gets back to ReadBufferWorker
and
+ // can populate into completedList.
+ Thread.sleep(3000l);
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 0 elements")
+ .isEqualTo(0);
+
+ Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
Review Comment:
use .hasSize()
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+ final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+ final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+ Mockito.doAnswer(invocationOnMock -> {
+ movedToInProgressList.incrementAndGet();
+ while (movedToInProgressList.get() < 3 || !preClosedAssertion.get())
{
+
+ }
+ movedToCompletedList.incrementAndGet();
+ return successOp;
+ })
+ .when(client)
Review Comment:
this is very brittle being timing based. normally I'd say "no" here, but I
know I have a forthcoming pr which uses object.wait/notify to synchronize
https://github.com/apache/hadoop/pull/5117/files#diff-e829dbaa29faf05ae0b331439e9aec3cd02248464a097c86a0227783337b9b76R370
if this test causes problems it should do the same
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(3000l);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(1000l);
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+ Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+
+ inputStream.close();
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+
+ //Sleep so that response from mockedClient gets back to ReadBufferWorker
and
+ // can populate into completedList.
+ Thread.sleep(3000l);
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 13 elements")
+ .isEqualTo(13);
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 0 elements")
+ .isEqualTo(0);
+
+ Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+ .describedAs("FreeList should have 16 elements")
+ .isEqualTo(16);
+ }
+
+ private int getStreamRelatedBufferCount(final List<ReadBuffer> bufferList,
+ final AbfsInputStream inputStream) {
+ int count = 0;
Review Comment:
prefer java8 streaming
```
bufferList.stream()
.filter(buffer -> buffer.getStream() == inputStream)
.count()
```
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(3000l);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(1000l);
Review Comment:
1_000L
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ //sleeping thread to mock the network latency from client to backend.
+ Thread.sleep(3000l);
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+
+ //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+ Thread.sleep(1000l);
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(readBufferManager.getFreeListCopy().size())
Review Comment:
use .hasSize(13) in the assert, so assertj will provide info about the list
if there's a mismatch
> disable purging list of in progress reads in abfs stream closed
> ---------------------------------------------------------------
>
> Key: HADOOP-18546
> URL: https://issues.apache.org/jira/browse/HADOOP-18546
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.3.4
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> turn off the prune of in progress reads in
> ReadBufferManager::purgeBuffersForStream
> this will ensure active prefetches for a closed stream complete. they wiill
> then get to the completed list and hang around until evicted by timeout, but
> at least prefetching will be safe.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]