[
https://issues.apache.org/jira/browse/HADOOP-17156?focusedWorklogId=636382&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-636382
]
ASF GitHub Bot logged work on HADOOP-17156:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Aug/21 10:25
Start Date: 10/Aug/21 10:25
Worklog Time Spent: 10m
Work Description: steveloughran commented on a change in pull request
#3285:
URL: https://github.com/apache/hadoop/pull/3285#discussion_r685882666
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -502,11 +503,66 @@ int getCompletedReadListSize() {
return completedReadList.size();
}
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getCompletedReadListCopy() {
+ return new LinkedList<>(completedReadList);
+ }
+
+ @VisibleForTesting
+ public LinkedList<Integer> getFreeListCopy() {
+ return new LinkedList<>(freeList);
+ }
+
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getReadAheadQueueCopy() {
+ return new LinkedList<>(readAheadQueue);
+ }
+
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getInProgressCopiedList() {
+ return new LinkedList<>(inProgressList);
+ }
+
@VisibleForTesting
void callTryEvict() {
tryEvict();
}
+
+ /**
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManager} when stream is closed.
+ * @param stream input stream.
+ */
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+ readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
+ purgeList(stream, completedReadList);
+ purgeList(stream, inProgressList);
+ }
+
+ /**
+ * Method to remove buffers associated with a {@link AbfsInputStream}
Review comment:
add comment this must be called from synchronized block
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -502,11 +503,66 @@ int getCompletedReadListSize() {
return completedReadList.size();
}
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getCompletedReadListCopy() {
+ return new LinkedList<>(completedReadList);
Review comment:
afraid if we need concurrency we should use
`Collections.synchronizedList()`
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -502,11 +503,66 @@ int getCompletedReadListSize() {
return completedReadList.size();
}
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getCompletedReadListCopy() {
+ return new LinkedList<>(completedReadList);
+ }
+
+ @VisibleForTesting
+ public LinkedList<Integer> getFreeListCopy() {
+ return new LinkedList<>(freeList);
+ }
+
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getReadAheadQueueCopy() {
+ return new LinkedList<>(readAheadQueue);
+ }
+
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getInProgressCopiedList() {
+ return new LinkedList<>(inProgressList);
+ }
+
@VisibleForTesting
void callTryEvict() {
tryEvict();
}
+
+ /**
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManager} when stream is closed.
+ * @param stream input stream.
+ */
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+ readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
+ purgeList(stream, completedReadList);
+ purgeList(stream, inProgressList);
+ }
+
+ /**
+ * Method to remove buffers associated with a {@link AbfsInputStream}
+ * when its close method is called.
+ * As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+ * list in {@link this#doneReading(ReadBuffer, ReadBufferStatus, int)},
+ * we will skip adding those here again.
+ * @param stream associated input stream.
+ * @param list list of buffers like {@link this#completedReadList}
+ * or {@link this#inProgressList}.
+ */
+ private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+ for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+ ReadBuffer readBuffer = it.next();
+ if (readBuffer.getStream() == stream) {
+ it.remove();
+ if (readBuffer.getBufferindex() != -1) {
Review comment:
can you just add a comment?
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
##########
@@ -0,0 +1,119 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
+
+public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManager for " +
+ "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList<Integer> freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
+ }
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ for(int i=0; i < 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName,
fileContent);
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.read();
+ }
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ Assertions.assertThat(bufferManager.getCompletedReadListCopy().size())
+ .describedAs("After closing all streams completed list size
should be 0")
+ .isEqualTo(0);
+
+ Assertions.assertThat(bufferManager.getInProgressCopiedList().size())
+ .describedAs("After closing all streams inProgress list size
should be 0")
+ .isEqualTo(0);
Review comment:
assertThat(list).hasSize(), as you do below.
could consider an AsserListEmpty(text, list) with that operation
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
##########
@@ -0,0 +1,119 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
+
+public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManager for " +
+ "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList<Integer> freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
+ }
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ for(int i=0; i < 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName,
fileContent);
Review comment:
if you can parallelize this you can get a faster test setup, I think
ITestS3AAssumedRole does that.
##########
File path:
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -502,11 +503,66 @@ int getCompletedReadListSize() {
return completedReadList.size();
}
+ @VisibleForTesting
+ public LinkedList<ReadBuffer> getCompletedReadListCopy() {
+ return new LinkedList<>(completedReadList);
+ }
+
+ @VisibleForTesting
+ public LinkedList<Integer> getFreeListCopy() {
Review comment:
nit, just make simple list. Now, small risk of that copy happening while
something else changes. Now it is only test code, but could lead to
intermittent test fails.
so: make these all synchronized; return List() and use ArrayList inside
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
##########
@@ -0,0 +1,119 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
Review comment:
nit: imports
##########
File path:
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
##########
@@ -0,0 +1,119 @@
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
+
+public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManager for " +
+ "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList<Integer> freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
+ }
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ for(int i=0; i < 4; i++) {
+ String fileName = methodName.getMethodName() + i;
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName,
fileContent);
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.read();
+ }
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ Assertions.assertThat(bufferManager.getCompletedReadListCopy().size())
+ .describedAs("After closing all streams completed list size
should be 0")
+ .isEqualTo(0);
+
+ Assertions.assertThat(bufferManager.getInProgressCopiedList().size())
+ .describedAs("After closing all streams inProgress list size
should be 0")
+ .isEqualTo(0);
+ Assertions.assertThat(bufferManager.getFreeListCopy())
+ .describedAs("After closing all streams free list contents
should match with " + freeList)
+ .hasSize(numBuffers)
+ .containsExactlyInAnyOrderElementsOf(freeList);
+ Assertions.assertThat(bufferManager.getReadAheadQueueCopy())
+ .describedAs("After closing all stream ReadAheadQueue should
be empty")
+ .hasSize(0);
+
+ }
+
+ @Test
+ public void testPurgeBufferManagerForSequentialStream() throws Exception {
+ describe("Testing purging of buffers in ReadBufferManager for " +
+ "sequential input streams");
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ final String fileName = methodName.getMethodName();
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ AbfsInputStream iStream1 = (AbfsInputStream)
fs.open(testFilePath).getWrappedStream();
+ iStream1.read();
+ // closing the stream right away.
+ iStream1.close();
Review comment:
can you close these always in a try/finally, just so if an assert fails
things still get cleaned up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 636382)
Time Spent: 40m (was: 0.5h)
> Clear abfs readahead requests on stream close
> ---------------------------------------------
>
> Key: HADOOP-17156
> URL: https://issues.apache.org/jira/browse/HADOOP-17156
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.3.0
> Reporter: Rajesh Balamohan
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 40m
> Remaining Estimate: 0h
>
> It would be good to close/clear pending read ahead requests on stream close().
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]