zhijiangW commented on a change in pull request #9993: 
[FLINK-14498][runtime]Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool.
URL: https://github.com/apache/flink/pull/9993#discussion_r340988167
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ##########
 @@ -661,26 +660,32 @@ public void 
testBlockingRequestFromMultiLocalBufferPool() throws Exception {
                        }
                        final List<MemorySegment> exclusiveSegments = 
globalPool.requestMemorySegments();
                        assertTrue(globalPool.isAvailable().isDone());
-                       for (BufferPool localPool: localBufferPools) {
+                       for (final BufferPool localPool: localBufferPools) {
                                assertTrue(localPool.isAvailable().isDone());
                        }
 
                        // blocking request buffers form local buffer pools
                        final CountDownLatch latch = new 
CountDownLatch(numLocalBufferPool);
                        final BlockingQueue<BufferBuilder> segmentsRequested = 
new ArrayBlockingQueue<>(numBuffers);
+                       final AtomicReference<Throwable> cause = new 
AtomicReference<>();
                        for (final BufferPool localPool: localBufferPools) {
                                executorService.submit(() -> {
-                                       for (int num = localPoolMaxSize; num > 
0; --num) {
-                                               
ExceptionUtils.suppressExceptions(
-                                                       () -> 
segmentsRequested.add(localPool.requestBufferBuilderBlocking()));
+                                       try {
+                                               for (int num = 
localPoolMaxSize; num > 0; --num) {
+                                                       
segmentsRequested.add(localPool.requestBufferBuilderBlocking());
+                                               }
+                                       } catch (Exception e) {
+                                               cause.set(e);
+                                       } finally {
+                                               latch.countDown();
                                        }
-                                       latch.countDown();
                                });
                        }
 
                        // wait until all available buffers are requested
                        while (globalPool.getNumberOfAvailableMemorySegments() 
> 0) {
                                Thread.sleep(100);
+                               assertNull(cause.get());
 
 Review comment:
   We should provide some exception message here for better tracing the issue.
   `assertNull(cause.get().getMessage(), cause.get())` ? Also for the case of 
below assert.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to