RexXiong commented on code in PR #3525:
URL: https://github.com/apache/celeborn/pull/3525#discussion_r2488960587
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4429,6 +4430,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
+ val WORKER_READBUFFER_PROCESS_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.readBuffer.processTimeout")
+ .categories("worker")
+ .version("0.7.0")
Review Comment:
Maybe we can cherry pick this pr to 0.6, WDYT?
##########
worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala:
##########
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
assert(threadId1 != threadId2)
}
+
+ test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to
fast fail in case of timeout") {
+ val memoryManager = mock(classOf[MemoryManager])
+ val readBufferDispatcher = new ReadBufferDispatcher(
+ memoryManager,
+ new CelebornConf().set(
+ CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+ CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+ null)
+ when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new
Answer[Boolean] {
+ override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+ })
+ val completableFuture = new CompletableFuture[Void]()
+ val readBufferRequest = new ReadBufferRequest(
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ new ReadBufferListener {
+ override def notifyBuffers(
+ allocatedBuffers: util.List[ByteBuf],
+ throwable: Throwable): Unit = {
+ assert(throwable != null)
Review Comment:
It seems that we cannot currently determine whether notifyBuffers has been
executed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]