[ 
https://issues.apache.org/jira/browse/HADOOP-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642112#comment-17642112
 ] 

ASF GitHub Bot commented on HADOOP-18546:
-----------------------------------------

steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
       REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {

Review Comment:
   override `teardown()`, call superclass. that way you know the order of 
things happening



##########
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml:
##########
@@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off.
   <description>The AbstractFileSystem for gs: uris.</description>
 </property>
 
-<property>
-  <name>fs.azure.enable.readahead</name>
-  <value>false</value>

Review Comment:
   retain but set to true. why so? storediag will log it and so show that 
someone has explicitly said "readahead is safe here"



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
       REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {
+    //thread wait so that previous test's inProgress buffers are processed and 
removed.
+    Thread.sleep(10000l);

Review Comment:
   don't like this as it potentially ladds 10s to a test run, one which could 
still be a bit flaky.
   
   what about using `testResetReadBufferManager()`?
   



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize(13) 



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+
+    //Sleep so that response from mockedClient gets back to ReadBufferWorker 
and
+    // can populate into completedList.
+    Thread.sleep(3000l);
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 0 elements")
+        .isEqualTo(0);
+
+    Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize()



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+    final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+    final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+    Mockito.doAnswer(invocationOnMock -> {
+          movedToInProgressList.incrementAndGet();
+          while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) 
{
+
+          }
+          movedToCompletedList.incrementAndGet();
+          return successOp;
+        })
+        .when(client)

Review Comment:
   this is very brittle being timing based. normally I'd say "no" here, but I 
know I have a forthcoming pr which uses object.wait/notify to synchronize
   
https://github.com/apache/hadoop/pull/5117/files#diff-e829dbaa29faf05ae0b331439e9aec3cd02248464a097c86a0227783337b9b76R370
   
   if this test causes problems it should do the same



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+
+    //Sleep so that response from mockedClient gets back to ReadBufferWorker 
and
+    // can populate into completedList.
+    Thread.sleep(3000l);
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 0 elements")
+        .isEqualTo(0);
+
+    Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 16 elements")
+        .isEqualTo(16);
+  }
+
+  private int getStreamRelatedBufferCount(final List<ReadBuffer> bufferList,
+      final AbfsInputStream inputStream) {
+    int count = 0;

Review Comment:
   prefer java8 streaming
   ```
   bufferList.stream()
     .filter(buffer -> buffer.getStream() == inputStream)
     .count()
   ```
   



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);

Review Comment:
   1_000L



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize(13) in the assert, so assertj will provide info about the list 
if there's a mismatch





> disable purging list of in progress reads in abfs stream closed
> ---------------------------------------------------------------
>
>                 Key: HADOOP-18546
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18546
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.3.4
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> turn off the prune of in progress reads in 
> ReadBufferManager::purgeBuffersForStream
> this will ensure active prefetches for a closed stream complete. they wiill 
> then get to the completed list and hang around until evicted by timeout, but 
> at least prefetching will be safe.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to