steveloughran commented on code in PR #4921:
URL: https://github.com/apache/hadoop/pull/4921#discussion_r981079747


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -393,39 +392,40 @@ public void testVectoredIOEndToEnd() throws Exception {
 
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
-      // user can perform other computations while waiting for IO.
       for (FileRange res : fileRanges) {
         dataProcessor.submit(() -> {
           try {
-            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+            readBufferValidateDataAndReturnToPool(res, countDown);
           } catch (Exception e) {
-            LOG.error("Error while process result for {} ", res, e);
+            String error = String.format("Error while processing result for 
%s", res);
+            LOG.error(error, e);
+            ContractTestUtils.fail(error, e);
           }
         });
       }
-      if (!countDown.await(100, TimeUnit.SECONDS)) {
-        throw new AssertionError("Error while processing vectored io results");
+      // user can perform other computations while waiting for IO.
+      if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
+        ContractTestUtils.fail("Timeout/Error while processing vectored io 
results");
       }
     } finally {
-      pool.release();
       HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);
     }
   }
 
-  private void readBufferValidateDataAndReturnToPool(ByteBufferPool pool,
-                                                     FileRange res,
+  private void readBufferValidateDataAndReturnToPool(FileRange res,
                                                      CountDownLatch 
countDownLatch)
           throws IOException, TimeoutException {
     CompletableFuture<ByteBuffer> data = res.getData();
-    ByteBuffer buffer = FutureIO.awaitFuture(data,
-            VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
-            TimeUnit.SECONDS);
     // Read the data and perform custom operation. Here we are just
     // validating it with original data.
-    assertDatasetEquals((int) res.getOffset(), "vecRead",
-            buffer, res.getLength(), DATASET);
-    // return buffer to pool.
-    pool.putBuffer(buffer);
+    FutureIO.awaitFuture(data.thenAccept(buffer -> {
+      assertDatasetEquals((int) res.getOffset(),
+              "vecRead", buffer, res.getLength(), DATASET);
+      // return buffer to the pool once read.
+      pool.putBuffer(buffer);
+    }), VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);

Review Comment:
   nit: put the timeout + unit on a new line



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -393,39 +392,40 @@ public void testVectoredIOEndToEnd() throws Exception {
 
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
-      // user can perform other computations while waiting for IO.
       for (FileRange res : fileRanges) {
         dataProcessor.submit(() -> {
           try {
-            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+            readBufferValidateDataAndReturnToPool(res, countDown);
           } catch (Exception e) {
-            LOG.error("Error while process result for {} ", res, e);
+            String error = String.format("Error while processing result for 
%s", res);
+            LOG.error(error, e);
+            ContractTestUtils.fail(error, e);
           }
         });
       }
-      if (!countDown.await(100, TimeUnit.SECONDS)) {
-        throw new AssertionError("Error while processing vectored io results");
+      // user can perform other computations while waiting for IO.
+      if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
+        ContractTestUtils.fail("Timeout/Error while processing vectored io 
results");
       }
     } finally {
-      pool.release();
       HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);

Review Comment:
   use VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to