This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c23392953 [CELEBORN-1580] ReadBufferDispacther should notify exception
to listener
c23392953 is described below
commit c23392953793de537b6b3fb5d63437547807ae8d
Author: codenohup <[email protected]>
AuthorDate: Thu Aug 29 01:55:17 2024 +0800
[CELEBORN-1580] ReadBufferDispacther should notify exception to listener
### What changes were proposed in this pull request?
When the ReadBufferDispatcher encounters an exception, it should notify an
exception to listener. The listener is responsible for informing the Celeborn
client of the error and initiating some fault tolerance strategies.
### Why are the changes needed?
If the ReadBufferDispatcher don't notify the listener of an exception
message, it may result in the listener (MapPartitionDataReader) being stuck in
a prolonged wait state, ultimately leading to the job hanging.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add an unit test case.
Closes #2707 from codenohup/fix-oom.
Authored-by: codenohup <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../deploy/worker/memory/MemoryManager.java | 3 +-
.../deploy/worker/memory/ReadBufferDispatcher.java | 3 +
.../deploy/worker/storage/MapPartitionData.java | 19 +++---
.../deploy/memory/ReadBufferDispactherSuite.scala | 67 ++++++++++++++++++++++
4 files changed, 82 insertions(+), 10 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 372761654..d48888766 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -462,7 +462,8 @@ public class MemoryManager {
readBufferCounter.addAndGet(delta);
}
- protected boolean readBufferAvailable(int requiredBytes) {
+ @VisibleForTesting
+ public boolean readBufferAvailable(int requiredBytes) {
return readBufferCounter.get() + requiredBytes < readBufferThreshold;
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 17bbaa3b8..b433d3c2e 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -113,6 +113,9 @@ public class ReadBufferDispatcher extends Thread {
if (buffers != null) {
buffers.forEach(this::recycle);
}
+
+ // notify listener has exception
+ request.getBufferListener().notifyBuffers(null, e);
}
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index eba0c2faf..e0c60c0b9 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -136,22 +136,26 @@ class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeListener {
public void tryRequestBufferOrRead() {
if (bufferQueueInitialized.compareAndSet(false, true)) {
- bufferQueue.tryApplyNewBuffers(
- readers.size(),
- mapFileMeta.getBufferSize(),
- (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers));
+ bufferQueue.tryApplyNewBuffers(readers.size(),
mapFileMeta.getBufferSize(), this::onBuffer);
} else {
triggerRead();
}
}
// Read logic is executed on another thread.
- public void onBuffer(List<ByteBuf> buffers) {
+ public void onBuffer(List<ByteBuf> buffers, Throwable throwable) {
if (isReleased) {
buffers.forEach(memoryManager::recycleReadBuffer);
return;
}
+ if (throwable != null) {
+ for (MapPartitionDataReader reader : readers.values()) {
+ reader.recycleOnError(throwable);
+ }
+ return;
+ }
+
bufferQueue.add(buffers);
if (bufferQueue.size()
@@ -174,10 +178,7 @@ class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeListener {
triggerRead();
}
- bufferQueue.tryApplyNewBuffers(
- readers.size(),
- mapFileMeta.getBufferSize(),
- (allocatedBuffers, throwable) -> onBuffer(allocatedBuffers));
+ bufferQueue.tryApplyNewBuffers(readers.size(),
mapFileMeta.getBufferSize(), this::onBuffer);
}
public synchronized void readBuffers() {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
new file mode 100644
index 000000000..031bb7c55
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.memory
+
+import java.util
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+
+import io.netty.buffer.ByteBuf
+import org.mockito.ArgumentMatchers.anyInt
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager,
ReadBufferListener, ReadBufferRequest}
+import org.apache.celeborn.service.deploy.worker.memory.ReadBufferDispatcher
+
+class ReadBufferDispactherSuite extends CelebornFunSuite {
+
+ test("[CELEBORN-1580] Test ReadBufferDispacther notify exception to
listener") {
+ val mockedMemoryManager = mock(classOf[MemoryManager])
+ when(mockedMemoryManager.readBufferAvailable(anyInt())).thenAnswer(
+ new Answer[Int] {
+ override def answer(invocation: InvocationOnMock): Int = {
+ throw new RuntimeException("throw exception for test")
+ }
+ })
+
+ val conf = new CelebornConf()
+ val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager,
conf)
+ val requestFuture = new CompletableFuture[Void]()
+
+ val request = new ReadBufferRequest(
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ new ReadBufferListener {
+ override def notifyBuffers(
+ allocatedBuffers: util.List[ByteBuf],
+ throwable: Throwable): Unit = {
+ assert(throwable != null)
+ assert(throwable.isInstanceOf[RuntimeException])
+ assert(throwable.getMessage.equals("throw exception for test"))
+ requestFuture.complete(null);
+ }
+ })
+
+ readBufferDispatcher.addBufferRequest(request)
+ requestFuture.get(5, TimeUnit.SECONDS)
+ }
+
+}