This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 3ebd8f52b [CELEBORN-1580] ReadBufferDispacther should notify exception 
to listener
3ebd8f52b is described below

commit 3ebd8f52b386338535554686e14f886ae46aec47
Author: codenohup <[email protected]>
AuthorDate: Thu Aug 29 01:55:17 2024 +0800

    [CELEBORN-1580] ReadBufferDispacther should notify exception to listener
    
    ### What changes were proposed in this pull request?
    When the ReadBufferDispatcher encounters an exception, it should notify an 
exception to listener. The listener is responsible for informing the Celeborn 
client of the error and initiating some fault tolerance strategies.
    
    ### Why are the changes needed?
    If the ReadBufferDispatcher don't notify the listener of an exception 
message, it may result in the listener (MapPartitionDataReader) being stuck in 
a prolonged wait state, ultimately leading to the job hanging.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add an unit test case.
    
    Closes #2707 from codenohup/fix-oom.
    
    Authored-by: codenohup <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit c23392953793de537b6b3fb5d63437547807ae8d)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../deploy/worker/memory/MemoryManager.java        |  3 +-
 .../deploy/worker/memory/ReadBufferDispatcher.java |  3 +
 .../deploy/worker/storage/MapPartitionData.java    | 19 +++---
 .../deploy/memory/ReadBufferDispactherSuite.scala  | 67 ++++++++++++++++++++++
 4 files changed, 82 insertions(+), 10 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index c8b6af9af..f2bf9be07 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -462,7 +462,8 @@ public class MemoryManager {
     readBufferCounter.addAndGet(delta);
   }
 
-  protected boolean readBufferAvailable(int requiredBytes) {
+  @VisibleForTesting
+  public boolean readBufferAvailable(int requiredBytes) {
     return readBufferCounter.get() + requiredBytes < readBufferThreshold;
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 17bbaa3b8..b433d3c2e 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -113,6 +113,9 @@ public class ReadBufferDispatcher extends Thread {
         if (buffers != null) {
           buffers.forEach(this::recycle);
         }
+
+        // notify listener has exception
+        request.getBufferListener().notifyBuffers(null, e);
       }
     }
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index eba0c2faf..e0c60c0b9 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -136,22 +136,26 @@ class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeListener {
 
   public void tryRequestBufferOrRead() {
     if (bufferQueueInitialized.compareAndSet(false, true)) {
-      bufferQueue.tryApplyNewBuffers(
-          readers.size(),
-          mapFileMeta.getBufferSize(),
-          (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers));
+      bufferQueue.tryApplyNewBuffers(readers.size(), 
mapFileMeta.getBufferSize(), this::onBuffer);
     } else {
       triggerRead();
     }
   }
 
   // Read logic is executed on another thread.
-  public void onBuffer(List<ByteBuf> buffers) {
+  public void onBuffer(List<ByteBuf> buffers, Throwable throwable) {
     if (isReleased) {
       buffers.forEach(memoryManager::recycleReadBuffer);
       return;
     }
 
+    if (throwable != null) {
+      for (MapPartitionDataReader reader : readers.values()) {
+        reader.recycleOnError(throwable);
+      }
+      return;
+    }
+
     bufferQueue.add(buffers);
 
     if (bufferQueue.size()
@@ -174,10 +178,7 @@ class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeListener {
       triggerRead();
     }
 
-    bufferQueue.tryApplyNewBuffers(
-        readers.size(),
-        mapFileMeta.getBufferSize(),
-        (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers));
+    bufferQueue.tryApplyNewBuffers(readers.size(), 
mapFileMeta.getBufferSize(), this::onBuffer);
   }
 
   public synchronized void readBuffers() {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
new file mode 100644
index 000000000..031bb7c55
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.memory
+
+import java.util
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+
+import io.netty.buffer.ByteBuf
+import org.mockito.ArgumentMatchers.anyInt
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager, 
ReadBufferListener, ReadBufferRequest}
+import org.apache.celeborn.service.deploy.worker.memory.ReadBufferDispatcher
+
+class ReadBufferDispactherSuite extends CelebornFunSuite {
+
+  test("[CELEBORN-1580] Test ReadBufferDispacther notify exception to 
listener") {
+    val mockedMemoryManager = mock(classOf[MemoryManager])
+    when(mockedMemoryManager.readBufferAvailable(anyInt())).thenAnswer(
+      new Answer[Int] {
+        override def answer(invocation: InvocationOnMock): Int = {
+          throw new RuntimeException("throw exception for test")
+        }
+      })
+
+    val conf = new CelebornConf()
+    val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager, 
conf)
+    val requestFuture = new CompletableFuture[Void]()
+
+    val request = new ReadBufferRequest(
+      Integer.MAX_VALUE,
+      Integer.MAX_VALUE,
+      new ReadBufferListener {
+        override def notifyBuffers(
+            allocatedBuffers: util.List[ByteBuf],
+            throwable: Throwable): Unit = {
+          assert(throwable != null)
+          assert(throwable.isInstanceOf[RuntimeException])
+          assert(throwable.getMessage.equals("throw exception for test"))
+          requestFuture.complete(null);
+        }
+      })
+
+    readBufferDispatcher.addBufferRequest(request)
+    requestFuture.get(5, TimeUnit.SECONDS)
+  }
+
+}

Reply via email to