[
https://issues.apache.org/jira/browse/HADOOP-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641865#comment-17641865
]
ASF GitHub Bot commented on HADOOP-18546:
-----------------------------------------
snvijaya commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037025160
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+ final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+ final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+ Mockito.doAnswer(invocationOnMock -> {
+ movedToInProgressList.incrementAndGet();
+ while (movedToInProgressList.get() < 3 || !preClosedAssertion.get())
{
+
+ }
+ movedToCompletedList.incrementAndGet();
+ return successOp;
+ })
+ .when(client)
Review Comment:
The test is trying to unit test a bigger scope of existing inprogress buffer
moving to completed list. Will be nice to scope the test to inProgressList and
freelist counts, before and after close.
At this client.read() mock, I would suggest mocks that will invoke a large
sleep for each read. That way after queueReadAheads call and a 1 sec sleep, 3
buffers will be stuck inProgessList and the freeeList should show 13 free. The
asserts should continue to hold to same numbers post close as well.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The readBuffer will move to completedList and then finally should get
evicted.
+ */
+ @Test
+ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+ final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+ final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+ Mockito.doAnswer(invocationOnMock -> {
+ movedToInProgressList.incrementAndGet();
+ while (movedToInProgressList.get() < 3 || !preClosedAssertion.get())
{
+
+ }
+ movedToCompletedList.incrementAndGet();
+ return successOp;
+ })
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client,
+ "testSuccessfulReadAhead.txt");
+ queueReadAheads(inputStream);
+
+ final ReadBufferManager readBufferManager
+ = ReadBufferManager.getBufferManager();
+ while (movedToInProgressList.get() < 3) {
+
+ }
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 3 elements")
+ .isEqualTo(0);
+
+ inputStream.close();
+
+ Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+ inputStream))
+ .describedAs("InProgressList should have 3 elements")
+ .isEqualTo(3);
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 3 elements")
+ .isEqualTo(0);
+ preClosedAssertion.set(true);
+
+ while (movedToCompletedList.get() < 3) {
+
+ }
+
+ //Sleep so that response from mockedClient gets back to ReadBufferWorker
and
+ // can populate into completedList.
+ Thread.sleep(10000l);
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 3 elements")
+ .isEqualTo(3);
+
+ Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+ readBufferManager.callTryEvict();
+
+ Assertions.assertThat(getStreamRelatedBufferCount(
+ readBufferManager.getCompletedReadListCopy(), inputStream))
+ .describedAs("CompletedList should have 0 elements")
+ .isEqualTo(0);
+ }
+
+
+ /**
+ * This test expects InProgressList is not purged by the inputStream close.
+ * The already readBuffer present in the completedList shall be purged by the
+ * inputStream close.
+ * The readBuffer from inProgressList will move to completedList and then
+ * finally should get evicted.
+ */
+ @Test
+ public void
testStreamPurgeDuringReadAheadCallExecutingWithSomeCompletedBuffers()
Review Comment:
This test seems to be validating effect of purge on completedList. Does
this validate any test scenario that is not already covered in HADOOP-17156
commit testcases ?
Also, do all test asserts by HADOOP-17156 still hold good after this PR
change preventing inprogress list purge ?
> disable purging list of in progress reads in abfs stream closed
> ---------------------------------------------------------------
>
> Key: HADOOP-18546
> URL: https://issues.apache.org/jira/browse/HADOOP-18546
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.3.4
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> turn off the prune of in progress reads in
> ReadBufferManager::purgeBuffersForStream
> this will ensure active prefetches for a closed stream complete. they wiill
> then get to the completed list and hang around until evicted by timeout, but
> at least prefetching will be safe.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]