SteNicholas commented on code in PR #3525:
URL: https://github.com/apache/celeborn/pull/3525#discussion_r2490282517


##########
worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala:
##########
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
     val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
     assert(threadId1 != threadId2)
   }
+
+  test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to 
fast fail in case of timeout") {
+    val memoryManager = mock(classOf[MemoryManager])
+    val readBufferDispatcher = new ReadBufferDispatcher(
+      memoryManager,
+      new CelebornConf().set(
+        CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+        CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+      null)
+    when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new 
Answer[Boolean] {
+      override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+    })
+    val completableFuture = new CompletableFuture[Void]()
+    val readBufferRequest = new ReadBufferRequest(
+      Integer.MAX_VALUE,
+      Integer.MAX_VALUE,
+      new ReadBufferListener {
+        override def notifyBuffers(
+            allocatedBuffers: util.List[ByteBuf],
+            throwable: Throwable): Unit = {
+          assert(throwable != null)

Review Comment:
   @RexXiong, invoking `completableFuture#complete` could determine whether 
`notifyBuffers` is executed. If `notifyBuffers` is not executed, 
`completableFuture#get` would timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to