[
https://issues.apache.org/jira/browse/HADOOP-18106?focusedWorklogId=781204&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-781204
]
ASF GitHub Bot logged work on HADOOP-18106:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jun/22 13:52
Start Date: 14/Jun/22 13:52
Worklog Time Spent: 10m
Work Description: steveloughran commented on code in PR #4427:
URL: https://github.com/apache/hadoop/pull/4427#discussion_r896806033
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java:
##########
@@ -273,6 +273,7 @@ public boolean hasCapability(String capability) {
// new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS:
+ case StreamCapabilities.VECTOREDIO:
Review Comment:
nice
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java:
##########
@@ -52,4 +54,8 @@ public interface FileRange {
* @param data the future of the ByteBuffer that will have the data
*/
void setData(CompletableFuture<ByteBuffer> data);
+
+ static FileRange createFileRange(long offset, int length) {
Review Comment:
1. javadoc
2. where tests use the contructor themselves, they should switch to this
method
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -209,7 +194,38 @@ public static long roundUp(long offset, int chunkSize) {
}
/**
- * Sort and merge ranges to optimize the access from the underlying file
+ * Check if the input ranges are overlapping in nature.
+ * We call two ranges to be overlapping when start offset
+ * of second is less than the end offset of first.
+ * End offset is calculated as start offset + length.
+ * @param input list if input ranges.
+ * @return true/false based on logic explained above.
+ */
+ public static List<? extends FileRange>
validateNonOverlappingAndReturnSortedRanges(
+ List<? extends FileRange> input) {
+
+ if (input.size() <= 1) {
+ return input;
+ }
+ FileRange[] sortedRanges = sortRanges(input);
+ FileRange prev = sortedRanges[0];
+ for(int i=1; i<sortedRanges.length; i++) {
Review Comment:
nit: add a space
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java:
##########
@@ -18,6 +18,9 @@
package org.apache.hadoop.fs;
+import java.util.List;
Review Comment:
these are just for the javadocs, aren't they? bit of a pain.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -68,35 +69,19 @@ public static void validateVectoredReadRanges(List<?
extends FileRange> ranges)
/**
- * Read fully a list of file ranges asynchronously from this file.
- * The default iterates through the ranges to read each synchronously, but
- * the intent is that subclasses can make more efficient readers.
+ * This is the default implementation which iterates through the ranges
+ * to read each synchronously, but the intent is that subclasses
+ * can make more efficient readers.
* The data or exceptions are pushed into {@link FileRange#getData()}.
* @param stream the stream to read the data from
* @param ranges the byte ranges to read
* @param allocate the byte buffer allocation
- * @param minimumSeek the minimum number of bytes to seek over
- * @param maximumRead the largest number of bytes to combine into a single
read
*/
public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges,
- IntFunction<ByteBuffer> allocate,
- int minimumSeek,
- int maximumRead) {
- if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
- for(FileRange range: ranges) {
- range.setData(readRangeFrom(stream, range, allocate));
- }
- } else {
- for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
- maximumRead)) {
- CompletableFuture<ByteBuffer> read =
- readRangeFrom(stream, range, allocate);
- for(FileRange child: range.getUnderlying()) {
- child.setData(read.thenApply(
- (b) -> sliceTo(b, range.getOffset(), child)));
- }
- }
+ IntFunction<ByteBuffer> allocate) {
+ for(FileRange range: ranges) {
Review Comment:
nit: add a space
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -219,24 +235,22 @@ public static long roundUp(long offset, int chunkSize) {
* <li>Some file systems want to round ranges to be at checksum
boundaries.</li>
* </ul>
*
- * @param input the list of input ranges
+ * @param sortedRanges already sorted list of ranges based on offset.
* @param chunkSize round the start and end points to multiples of chunkSize
* @param minimumSeek the smallest gap that we should seek over in bytes
* @param maxSize the largest combined file range in bytes
* @return the list of sorted CombinedFileRanges that cover the input
*/
- public static List<CombinedFileRange> sortAndMergeRanges(List<? extends
FileRange> input,
- int chunkSize,
- int minimumSeek,
- int maxSize) {
- // sort the ranges by offset
- FileRange[] ranges = input.toArray(new FileRange[0]);
- Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
+ public static List<CombinedFileRange> mergeSortedRanges(List<? extends
FileRange> sortedRanges,
+ int chunkSize,
+ int minimumSeek,
+ int maxSize) {
+
CombinedFileRange current = null;
- List<CombinedFileRange> result = new ArrayList<>(ranges.length);
+ List<CombinedFileRange> result = new ArrayList<>(sortedRanges.size());
// now merge together the ones that merge
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
Review Comment:
nit: add a space
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -209,7 +194,38 @@ public static long roundUp(long offset, int chunkSize) {
}
/**
- * Sort and merge ranges to optimize the access from the underlying file
+ * Check if the input ranges are overlapping in nature.
+ * We call two ranges to be overlapping when start offset
+ * of second is less than the end offset of first.
+ * End offset is calculated as start offset + length.
+ * @param input list if input ranges.
+ * @return true/false based on logic explained above.
+ */
+ public static List<? extends FileRange>
validateNonOverlappingAndReturnSortedRanges(
+ List<? extends FileRange> input) {
+
+ if (input.size() <= 1) {
+ return input;
+ }
+ FileRange[] sortedRanges = sortRanges(input);
+ FileRange prev = sortedRanges[0];
+ for(int i=1; i<sortedRanges.length; i++) {
+ if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
+ throw new UnsupportedOperationException("Overlapping ranges are not
supported");
+ }
+ }
+ return Arrays.asList(sortedRanges);
+ }
+
+ public static FileRange[] sortRanges(List<? extends FileRange> input) {
Review Comment:
nit: javadoc; just use L211's text
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws
Exception {
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ @Test
+ public void testOverlappingRanges() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleOverlappingRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testSameRanges() throws Exception {
+ // Same ranges are special case of overlapping only.
FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleSameRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ protected List<FileRange> getSampleSameRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
+ return fileRanges;
+ }
+
+ protected List<FileRange> getSampleOverlappingRanges() {
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(100, 500));
+ fileRanges.add(FileRange.createFileRange(400, 500));
+ return fileRanges;
+ }
+ protected void validateUnsupportedOperation(FileSystem fs,
+ List<? extends FileRange>
fileRanges)
+ throws Exception {
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
+ LambdaTestUtils.intercept(UnsupportedOperationException.class,
+ () -> in.readVectored(fileRanges, allocate));
+ }
+ }
+
+ @Test
+ public void testSomeRandomNonOverlappingRanges() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(new FileRangeImpl(500, 100));
+ fileRanges.add(new FileRangeImpl(1000, 200));
+ fileRanges.add(new FileRangeImpl(50, 10));
+ fileRanges.add(new FileRangeImpl(10, 5));
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
Review Comment:
try using openFile() in some of the tests too, including with a file read
policy of vectored
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -53,10 +58,12 @@ public abstract class AbstractContractVectoredReadTest
extends AbstractFSContrac
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
public static final int DATASET_LEN = 64 * 1024;
- private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN,
'a', 32);
+ protected static final byte[] DATASET =
ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
- private final IntFunction<ByteBuffer> allocate;
+ protected final IntFunction<ByteBuffer> allocate;
+
+ private WeakReferencedElasticByteBufferPool pool = new
WeakReferencedElasticByteBufferPool();
Review Comment:
final?
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java:
##########
@@ -338,14 +337,19 @@ public void testReadVectored() throws Exception {
}).when(stream).readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
// should not merge the ranges
- VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100,
100);
+ VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
Mockito.verify(stream, Mockito.times(3))
.readFully(ArgumentMatchers.anyLong(),
ArgumentMatchers.any(ByteBuffer.class));
for(int b=0; b < input.size(); ++b) {
validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
}
}
+ /**
+ * TODO: Honestly this test doesn't makes sense much now as it is similar to
above.
+ * Took time to fix this though. If you guys approve, I will remove.
+ * @throws Exception
+ */
Review Comment:
cut it. keeps maintenance costs down -and mockito is expensive to maintain
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws
Exception {
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ @Test
+ public void testOverlappingRanges() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleOverlappingRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testSameRanges() throws Exception {
+ // Same ranges are special case of overlapping only.
FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleSameRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ protected List<FileRange> getSampleSameRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
fileRanges.add(new FileRangeImpl(8*1024, 1000));
+ return fileRanges;
+ }
+
+ protected List<FileRange> getSampleOverlappingRanges() {
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(100, 500));
+ fileRanges.add(FileRange.createFileRange(400, 500));
+ return fileRanges;
+ }
+ protected void validateUnsupportedOperation(FileSystem fs,
Review Comment:
nit, add a gap, maybe even javadocs given it is protected
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
Review Comment:
nit: add a space
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1120,14 +1176,14 @@ public void readByteArray(S3ObjectInputStream
objectContent,
}
/**
- * Read data from S3 using a http request.
- * This also handles if file has been changed while http call
- * is getting executed. If file has been changed RemoteFileChangedException
- * is thrown.
+ * Read data from S3 using a http request with retries.
+ * This also handles if file has been changed while http
+ * call is getting executed. If file has been changed
Review Comment:
nit: "if the file"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -26,14 +27,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;
Review Comment:
switch to the FileRange.createFileRange method
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java:
##########
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.fs;
+package org.apache.hadoop.fs.impl;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.fs.FileRange;
+
Review Comment:
1. declare as @Private just to make clear it should not be directly
created...and say this in the javadoc
2. should offset and length be final? or are the setters needed?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java:
##########
@@ -459,10 +459,10 @@ public void test_040_PositionedReadHugeFile() throws
Throwable {
public void test_045_vectoredIOHugeFile() throws Throwable {
assumeHugeFileExists();
List<FileRange> rangeList = new ArrayList<>();
- rangeList.add(new FileRangeImpl(5856368, 1167716));
- rangeList.add(new FileRangeImpl(3520861, 1167700));
- rangeList.add(new FileRangeImpl(8191913, 1167775));
- rangeList.add(new FileRangeImpl(1520861, 1167700));
+ rangeList.add(new FileRangeImpl(5856368, 116770));
Review Comment:
use FileRange.createFileRange
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1120,14 +1176,14 @@ public void readByteArray(S3ObjectInputStream
objectContent,
}
/**
- * Read data from S3 using a http request.
- * This also handles if file has been changed while http call
- * is getting executed. If file has been changed RemoteFileChangedException
- * is thrown.
+ * Read data from S3 using a http request with retries.
+ * This also handles if file has been changed while http
Review Comment:
nit "while the http"
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java:
##########
@@ -141,8 +140,8 @@ public void testSortAndMerge() {
new FileRangeImpl(1000, 100)
);
assertFalse("Ranges are non disjoint",
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
- List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
- input, 100, 1001, 2500);
+ List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+ Arrays.asList(sortRanges(input)), 100, 1001, 2500);
assertEquals("merged range size", 1, outputList.size());
Review Comment:
asserts on list size are better done in AssertJ, as it will include the list
in the exception
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -272,26 +350,29 @@ public void testVectoredReadAfterNormalRead() throws
Exception {
.isEqualTo(200);
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testMultipleVectoredReads() throws Exception {
FileSystem fs = getFileSystem();
- List<FileRange> fileRanges1 = createSomeOverlappingRanges();
- List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+ List<FileRange> fileRanges1 = createSomeRandomRanges();
+ List<FileRange> fileRanges2 = createSomeRandomRanges();
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges1, allocate);
in.readVectored(fileRanges2, allocate);
validateVectoredReadResult(fileRanges2, DATASET);
validateVectoredReadResult(fileRanges1, DATASET);
+ returnBuffersToPoolPostRead(fileRanges1, pool);
+ returnBuffersToPoolPostRead(fileRanges2, pool);
}
}
- protected List<FileRange> createSomeOverlappingRanges() {
+ protected List<FileRange> createSomeRandomRanges() {
Review Comment:
not really random, "non overlapping" better
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+ List<CombinedFileRange> combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
for(CombinedFileRange combinedFileRange: combinedFileRanges) {
- CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
- ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
- combinedFileRange.setData(result);
unboundedThreadPool.submit(
- () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
- // Not putting read single range call inside try block as
- // exception if any occurred during this call will be raised
- // during awaitFuture call while getting the combined buffer.
- readSingleRange(combinedFileRange, buffer);
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+ S3Object objectRange = null;
+ S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
- }
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
Review Comment:
this reference needs to be kept until all the reads have finished, as if its
finalizer is called it closes the inner streams. Are you doing this?
It does look like it...just add a comment above the variable explaining why
it MUST be kept
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws
Exception {
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ @Test
+ public void testOverlappingRanges() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleOverlappingRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
}
}
@Test
public void testSameRanges() throws Exception {
+ // Same ranges are special case of overlapping only.
FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = getSampleSameRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, pool);
+ }
+ }
+
+ protected List<FileRange> getSampleSameRanges() {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(8*1024, 1000));
Review Comment:
add some spaces there, or at least an int variable to something like 8_000
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java:
##########
@@ -19,9 +19,8 @@
package org.apache.hadoop.fs;
import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
Review Comment:
there's a mixup of static and non static imports. This is probably worth
fixing while you are cleaning up the unused imports, as it's more unusual
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java:
##########
@@ -1136,6 +1138,17 @@ public static void
validateVectoredReadResult(List<FileRange> fileRanges,
}
}
+ public static void returnBuffersToPoolPostRead(List<FileRange> fileRanges,
Review Comment:
javadoc please
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java:
##########
@@ -32,4 +50,38 @@ public TestLocalFSContractVectoredRead(String bufferType) {
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
+
+ @Test
+ public void testChecksumValidationDuringVectoredRead() throws Exception {
+ Path testPath = path("big_range_checksum");
+ LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
+ byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+ try (FSDataOutputStream out = localFs.create(testPath, true)){
+ out.write(DATASET_CORRECT);
+ }
+ Path checksumPath = localFs.getChecksumFile(testPath);
Review Comment:
do think this would be safer through the java file API, or do other tests do
the same trick here
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange>
ranges,
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr,
ranges);
checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ }
+ List<? extends FileRange> sortedRanges =
validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}
- if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+ if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- for(FileRange range: ranges) {
+ for(FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
- List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+ List<CombinedFileRange> combinedFileRanges =
mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
LOG.debug("Number of original ranges size {} , Number of combined ranges
{} ",
ranges.size(), combinedFileRanges.size());
for(CombinedFileRange combinedFileRange: combinedFileRanges) {
- CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
- ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
- combinedFileRange.setData(result);
unboundedThreadPool.submit(
- () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
buffer));
+ () -> readCombinedRangeAndUpdateChildren(combinedFileRange,
allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
" on path {} for ranges {} ", pathStr, ranges);
}
/**
- * Read data in the combinedFileRange and update data in buffers
- * of all underlying ranges.
- * @param combinedFileRange combined range.
- * @param buffer combined buffer.
+ * Read the data from S3 for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
- ByteBuffer buffer) {
- // Not putting read single range call inside try block as
- // exception if any occurred during this call will be raised
- // during awaitFuture call while getting the combined buffer.
- readSingleRange(combinedFileRange, buffer);
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, pathStr);
+ S3Object objectRange = null;
+ S3ObjectInputStream objectContent = null;
try {
- // In case of single range we return the original byte buffer else
- // we return slice byte buffers for each child ranges.
- ByteBuffer combinedBuffer =
FutureIOSupport.awaitFuture(combinedFileRange.getData());
- if (combinedFileRange.getUnderlying().size() == 1) {
-
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
- } else {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- updateOriginalRange(child, combinedBuffer, combinedFileRange);
- }
+ checkIfVectoredIOStopped();
+ final String operationName = "readCombinedFileRange";
+ objectRange = getS3Object(operationName,
+ combinedFileRange.getOffset(),
+ combinedFileRange.getLength());
+ objectContent = objectRange.getObjectContent();
+ if (objectContent == null) {
+ throw new PathIOException(uri,
+ "Null IO stream received during " + operationName);
}
+ populateChildBuffers(combinedFileRange, objectContent, allocate);
} catch (Exception ex) {
- LOG.warn("Exception occurred while reading combined range from file {}",
pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ",
combinedFileRange, pathStr, ex);
for(FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex);
}
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+ }
+ LOG.debug("Finished reading range {} from path {} ", combinedFileRange,
pathStr);
+ }
+
+ /**
+ * Populate underlying buffers of the child ranges.
+ * @param combinedFileRange big combined file range.
+ * @param objectContent data from s3.
+ * @param allocate method to allocate child byte buffers.
+ * @throws IOException any IOE.
+ */
+ private void populateChildBuffers(CombinedFileRange combinedFileRange,
+ S3ObjectInputStream objectContent,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ // If the combined file range just contains a single child
+ // range, we only have to fill that one child buffer else
+ // we drain the intermediate data between consecutive ranges
+ // and fill the buffers one by one.
+ if (combinedFileRange.getUnderlying().size() == 1) {
+ FileRange child = combinedFileRange.getUnderlying().get(0);
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ } else {
+ FileRange prev = null;
+ for (FileRange child : combinedFileRange.getUnderlying()) {
+ if (prev != null && prev.getOffset() + prev.getLength() <
child.getOffset()) {
+ long drainQuantity = child.getOffset() - prev.getOffset() -
prev.getLength();
+ drainUnnecessaryData(objectContent, drainQuantity);
+ }
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child.getLength(), buffer, objectContent);
+ child.getData().complete(buffer);
+ prev = child;
+ }
}
}
/**
- * Update data in child range from combined range.
- * @param child child range.
- * @param combinedBuffer combined buffer.
- * @param combinedFileRange combined range.
+ * Drain unnecessary data in between ranges.
+ * @param objectContent s3 data stream.
+ * @param drainQuantity how many bytes to drain.
+ * @throws IOException any IOE.
*/
- private void updateOriginalRange(FileRange child,
- ByteBuffer combinedBuffer,
- CombinedFileRange combinedFileRange) {
- LOG.trace("Start Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
- ByteBuffer childBuffer = sliceTo(combinedBuffer,
combinedFileRange.getOffset(), child);
- child.getData().complete(childBuffer);
- LOG.trace("End Filling original range [{}, {}) from combined range [{},
{}) ",
- child.getOffset(), child.getLength(),
- combinedFileRange.getOffset(), combinedFileRange.getLength());
+ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long
drainQuantity)
Review Comment:
are you collecting any stats on this or other vector io reads?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1152,6 +1212,12 @@ private S3Object getS3Object(String operationName, long
position,
return objectRange;
}
+ private void checkIfVectoredIOStopped() throws InterruptedIOException {
Review Comment:
add a javadoc
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -99,4 +102,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws
Exception {
}
}
}
+
+ @Test
+ public void testStopVectoredIoOperationsCloseStream() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = createSomeRandomRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+ in.readVectored(fileRanges, allocate);
+ in.close();
+ LambdaTestUtils.intercept(InterruptedIOException.class,
+ () -> validateVectoredReadResult(fileRanges, DATASET));
+ }
+ // reopening the stream should succeed.
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+ in.readVectored(fileRanges, allocate);
+ validateVectoredReadResult(fileRanges, DATASET);
+ }
+ }
+
+ @Test
+ public void testStopVectoredIoOperationsUnbuffer() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = createSomeRandomRanges();
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
Review Comment:
try using openfile. might be good to even call getFileStatus and pass that
in too
Issue Time Tracking
-------------------
Worklog Id: (was: 781204)
Time Spent: 40m (was: 0.5h)
> Handle memory fragmentation in S3 Vectored IO implementation.
> -------------------------------------------------------------
>
> Key: HADOOP-18106
> URL: https://issues.apache.org/jira/browse/HADOOP-18106
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Mukund Thakur
> Assignee: Mukund Thakur
> Priority: Major
> Labels: pull-request-available
> Time Spent: 40m
> Remaining Estimate: 0h
>
> As we have implemented merging of ranges in the S3AInputStream implementation
> of vectored IO api, it can lead to memory fragmentation. Let me explain by
> example.
>
> Suppose client requests for 3 ranges.
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to
> free the memory for requested ranges and memory of the gaps will never be
> released for eg here (500-700 and 1000-1200).
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]