SteNicholas commented on code in PR #3525:
URL: https://github.com/apache/celeborn/pull/3525#discussion_r2490282517
##########
worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala:
##########
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
assert(threadId1 != threadId2)
}
+
+ test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to
fast fail in case of timeout") {
+ val memoryManager = mock(classOf[MemoryManager])
+ val readBufferDispatcher = new ReadBufferDispatcher(
+ memoryManager,
+ new CelebornConf().set(
+ CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+ CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+ null)
+ when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new
Answer[Boolean] {
+ override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+ })
+ val completableFuture = new CompletableFuture[Void]()
+ val readBufferRequest = new ReadBufferRequest(
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ new ReadBufferListener {
+ override def notifyBuffers(
+ allocatedBuffers: util.List[ByteBuf],
+ throwable: Throwable): Unit = {
+ assert(throwable != null)
Review Comment:
@RexXiong, invoking `completableFuture#complete` could determine whether
`notifyBuffers` is executed. If `notifyBuffers` is not executed,
`completableFuture#get` would timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]