zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r340988167
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##########
 @@ -661,26 +660,32 @@ public void 
testBlockingRequestFromMultiLocalBufferPool() throws Exception {
                        }
                        final List<MemorySegment> exclusiveSegments = 
globalPool.requestMemorySegments();
                        assertTrue(globalPool.isAvailable().isDone());
-                       for (BufferPool localPool: localBufferPools) {
+                       for (final BufferPool localPool: localBufferPools) {
                                assertTrue(localPool.isAvailable().isDone());
                        }
 
                        // blocking request buffers form local buffer pools
                        final CountDownLatch latch = new 
CountDownLatch(numLocalBufferPool);
                        final BlockingQueue<BufferBuilder> segmentsRequested = 
new ArrayBlockingQueue<>(numBuffers);
+                       final AtomicReference<Throwable> cause = new 
AtomicReference<>();
                        for (final BufferPool localPool: localBufferPools) {
                                executorService.submit(() -> {
-                                       for (int num = localPoolMaxSize; num > 
0; --num) {
-                                               
ExceptionUtils.suppressExceptions(
-                                                       () -> 
segmentsRequested.add(localPool.requestBufferBuilderBlocking()));
+                                       try {
+                                               for (int num = 
localPoolMaxSize; num > 0; --num) {
+                                                       
segmentsRequested.add(localPool.requestBufferBuilderBlocking());
+                                               }
+                                       } catch (Exception e) {
+                                               cause.set(e);
+                                       } finally {
+                                               latch.countDown();
                                        }
-                                       latch.countDown();
                                });
                        }
 
                        // wait until all available buffers are requested
                        while (globalPool.getNumberOfAvailableMemorySegments() 
> 0) {
                                Thread.sleep(100);
+                               assertNull(cause.get());
 
 Review comment:
   We should provide some exception message here for better tracing the issue.
   `assertNull(cause.get().getMessage(), cause.get())` ?
   
   In addition this check only guarantees that there is no exceptions during 
non-blocking request. It might still exist the exception during blocking 
request process? If so we need double check it after below `latch.await();`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to