steveloughran commented on code in PR #4921:
URL: https://github.com/apache/hadoop/pull/4921#discussion_r977566267
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
+ }
+ });
+ }
+ if (!countDown.await(100, TimeUnit.SECONDS)) {
Review Comment:
timeout should be a static constant and more visible
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
Review Comment:
should be saved to a field/variable, with junit thread rethrowing if the
value is non null
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
+ }
+ });
+ }
+ if (!countDown.await(100, TimeUnit.SECONDS)) {
+ throw new AssertionError("Error while processing vectored io results");
+ }
+ } finally {
+ pool.release();
+ HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);
+ }
+ }
+
+ private void readBufferValidateDataAndReturnToPool(ByteBufferPool pool,
+ FileRange res,
+ CountDownLatch
countDownLatch)
+ throws IOException, TimeoutException {
+ CompletableFuture<ByteBuffer> data = res.getData();
+ ByteBuffer buffer = FutureIO.awaitFuture(data,
Review Comment:
I think we all -you, me, everyone else- needs to spend some time working
with CompletableFuture and chaining them.
In this code
```
data.thenAccept(buffer -> {
// all the validation
});
```
and await() for that.
It's a mess because java's checked exceptions cripple their
lambda-expression methods when IO operations are invoked. But if we trying to
live in their world at least we will get more insight into how we could
actually improve our own code to work better there. Though it may of course be
too late by now.
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
+ }
+ });
+ }
+ if (!countDown.await(100, TimeUnit.SECONDS)) {
+ throw new AssertionError("Error while processing vectored io results");
+ }
+ } finally {
+ pool.release();
Review Comment:
how about adding an assert on L408 that the pool has its buffers returned?
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
+ }
+ });
+ }
+ if (!countDown.await(100, TimeUnit.SECONDS)) {
+ throw new AssertionError("Error while processing vectored io results");
Review Comment:
declare timeout
##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
}
}
+ /**
+ * This test creates list of ranges and then submit a readVectored
+ * operation and then uses a separate thread pool to process the
+ * results asynchronously.
+ */
+ @Test
+ public void testVectoredIOEndToEnd() throws Exception {
+ FileSystem fs = getFileSystem();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+ CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+ try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+ in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+ // user can perform other computations while waiting for IO.
+ for (FileRange res : fileRanges) {
+ dataProcessor.submit(() -> {
+ try {
+ readBufferValidateDataAndReturnToPool(pool, res, countDown);
+ } catch (Exception e) {
+ LOG.error("Error while process result for {} ", res, e);
+ }
+ });
+ }
+ if (!countDown.await(100, TimeUnit.SECONDS)) {
+ throw new AssertionError("Error while processing vectored io results");
+ }
+ } finally {
+ pool.release();
+ HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);
Review Comment:
use same constant as proposed for L100
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]