pranavsaxena-microsoft commented on code in PR #5117: URL: https://github.com/apache/hadoop/pull/5117#discussion_r1019839338
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -454,31 +588,65 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}; {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead, buffer); + } + // decrement counter. + buffer.prefetchFinished(); + + try { + synchronized (this) { + // remove from the list + if (!inProgressList.remove(buffer)) { + // this is a sign of inconsistent state, so a major problem + String message = + String.format("Read completed from an operation not declared as in progress %s", + buffer); + LOGGER.warn(message); + // release the buffer (which may raise an exception) + placeBufferOnFreeList("read not in progress", buffer); + // report the failure + throw new IllegalStateException(message); + } + + boolean shouldFreeBuffer = false; + String freeBufferReason = ""; if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setLength(bytesActuallyRead); } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // read failed or there was no data, -the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "failed read"; } // completed list also contains FAILED read buffers // for sending exception message to clients. buffer.setStatus(result); buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); + if (!buffer.isStreamClosed()) { + // completed reads are added to the list. + LOGGER.trace("Adding buffer to completed list {}", buffer); + completedReadList.add(buffer); Review Comment: Lets not add buffer in completedList in the cases where we are going to add in freeList(due to byteRead == 0). - Got exception: ``` 2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:doneReading(591)) - ReadBufferWorker completed file /testfilefb393e327a88 for offset 4194304 bytes 0; org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=READING_IN_PROGRESS, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=0, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false} 2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:doneReading(633)) - Adding buffer to completed list org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false} 2022-11-10 20:48:22,516 DEBUG [ABFS-prefetch-7]: services.ReadBufferManager (ReadBufferManager.java:placeBufferOnFreeList(407)) - Returning buffer index 0 to free list for 'failed read'; owner org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=0, timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream closed=false} 2022-11-10 20:48:22,517 TRACE [ABFS-prefetch-7]: services.ReadBufferWorker (ReadBufferWorker.java:run(95)) - Exception received: org.apache.hadoop.fs.PathIOException: `/testfilefb393e327a88': Input/output error: Buffer index 0 found in buffer collection completedReadList at org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:93) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalStateException: Buffer index 0 found in buffer collection completedReadList at org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInCollection(ReadBufferManager.java:471) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInUse(ReadBufferManager.java:457) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:413) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.doneReading(ReadBufferManager.java:646) at org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:87) ... 1 more ``` Reason: https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L629 -> https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L641 -> https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L413 -> https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L457 -> exception. - Double addition in freeList: -- Let in doneReading, prefetch-thread reaches https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415 and somehow contextSwitch happens and this thread doesn't get CPU for some time. Meanwhile, the buffer added in completedList gets picked up for eviction and its run and added in freeList. Now, the prefetchThread gets CPU again and runs https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415-L422 and adds in freeList. -- Wrote an experiment for the same: https://github.com/pranavsaxena-microsoft/hadoop/commit/7b6ac1558fa12c46217a296f197bf51a8b86c10b ``` 2022-11-10 22:22:57,147 TRACE [Thread-28]: services.ReadBufferManager (ReadBufferManager.java:lambda$init$0(147)) - INCONSISTENCY!! on index 8 Exception in thread "Thread-16" java.lang.AssertionError: At index4194304 at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.junit.Assert.assertFalse(Assert.java:65) at org.apache.hadoop.fs.azurebfs.ITestPartialRead.lambda$purgeIssue$0(ITestPartialRead.java:154) at java.lang.Thread.run(Thread.java:750) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org