RexXiong commented on code in PR #3525:
URL: https://github.com/apache/celeborn/pull/3525#discussion_r2488960587


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4429,6 +4430,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("50ms")
 
+  val WORKER_READBUFFER_PROCESS_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.readBuffer.processTimeout")
+      .categories("worker")
+      .version("0.7.0")

Review Comment:
   Maybe we can cherry pick this pr to 0.6, WDYT?



##########
worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala:
##########
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
     val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
     assert(threadId1 != threadId2)
   }
+
+  test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to 
fast fail in case of timeout") {
+    val memoryManager = mock(classOf[MemoryManager])
+    val readBufferDispatcher = new ReadBufferDispatcher(
+      memoryManager,
+      new CelebornConf().set(
+        CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+        CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+      null)
+    when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new 
Answer[Boolean] {
+      override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+    })
+    val completableFuture = new CompletableFuture[Void]()
+    val readBufferRequest = new ReadBufferRequest(
+      Integer.MAX_VALUE,
+      Integer.MAX_VALUE,
+      new ReadBufferListener {
+        override def notifyBuffers(
+            allocatedBuffers: util.List[ByteBuf],
+            throwable: Throwable): Unit = {
+          assert(throwable != null)

Review Comment:
   It seems that we cannot currently determine whether notifyBuffers has been 
executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to