This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2eb4c23eb [CELEBORN-1771] Bring forward 
PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
2eb4c23eb is described below

commit 2eb4c23eb8c87788daea66caa8e331e024d160b5
Author: zhengtao <[email protected]>
AuthorDate: Tue Dec 24 11:09:19 2024 +0800

    [CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
    
    ### What changes were proposed in this pull request?
    1.Move the peerWorker available judgement out of ThreadPool.
    2.Move `retain` after the available worker judgment Which means we don't 
have to release if peerWorker is unavailable.
    2. Add `fileWriter.decrementPendingWrites()` if peerWorker is unavailable 
since it will return and won't decrementPendingWreites in `writeLocalData`.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UT & cluster testing.
    
    Closes #2989 from zaynt4606/clb1771.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/worker/PushDataHandler.scala    | 80 ++++++++++++++--------
 1 file changed, 53 insertions(+), 27 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 15e217c88..f460e768e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -273,26 +273,27 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val writePromise = Promise[Array[StatusCode]]()
     // for primary, send data to replica
     if (doReplicate) {
+      val peer = location.getPeer
+      val peerWorker = new WorkerInfo(
+        peer.getHost,
+        peer.getRpcPort,
+        peer.getPushPort,
+        peer.getFetchPort,
+        peer.getReplicatePort)
+      if (unavailablePeers.containsKey(peerWorker)) {
+        fileWriter.decrementPendingWrites()
+        handlePushDataConnectionFail(callbackWithTimer, location)
+        return
+      }
+
       pushData.body().retain()
       replicateThreadPool.submit(new Runnable {
         override def run(): Unit = {
-          val peer = location.getPeer
-          val peerWorker = new WorkerInfo(
-            peer.getHost,
-            peer.getRpcPort,
-            peer.getPushPort,
-            peer.getFetchPort,
-            peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushData.body().release()
-            
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
-            logError(
-              s"PushData replication failed caused by unavailable peer for 
partitionLocation: $location")
-            callbackWithTimer.onFailure(
-              new 
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+            handlePushDataConnectionFail(callbackWithTimer, location)
             return
           }
-
           // Handle the response from replica
           val wrappedCallback = new RpcResponseCallback() {
             override def onSuccess(response: ByteBuffer): Unit = {
@@ -424,6 +425,26 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  def handlePushDataConnectionFail(
+      callbackWithTimer: RpcResponseCallback,
+      location: PartitionLocation): Unit = {
+    
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+    logError(
+      s"PushData replication failed caused by unavailable peer for 
partitionLocation: $location")
+    callbackWithTimer.onFailure(
+      new 
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+  }
+
+  def handlePushMergedDataConnectionFail(
+      pushMergedDataCallback: PushMergedDataCallback,
+      location: PartitionLocation): Unit = {
+    
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+    logError(
+      s"PushMergedData replication failed caused by unavailable peer for 
partitionLocation: $location")
+    pushMergedDataCallback.onFailure(
+      new 
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+  }
+
   def handlePushMergedData(
       pushMergedData: PushMergedData,
       callback: RpcResponseCallback): Unit = {
@@ -582,27 +603,32 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val writePromise = Promise[Array[StatusCode]]()
     // for primary, send data to replica
     if (doReplicate) {
+      val location = partitionIdToLocations.head._2
+      val peer = location.getPeer
+      val peerWorker = new WorkerInfo(
+        peer.getHost,
+        peer.getRpcPort,
+        peer.getPushPort,
+        peer.getFetchPort,
+        peer.getReplicatePort)
+      if (unavailablePeers.containsKey(peerWorker)) {
+        for (fileWriterIndex <- 0 until totalFileWriters) {
+          val fileWriter = fileWriters(fileWriterIndex)
+          if (fileWriter != null && 
!pushMergedDataCallback.isHardSplitPartition(fileWriterIndex)) {
+            fileWriter.decrementPendingWrites()
+          }
+        }
+        handlePushMergedDataConnectionFail(pushMergedDataCallback, location)
+        return
+      }
       pushMergedData.body().retain()
       replicateThreadPool.submit(new Runnable {
         override def run(): Unit = {
-          val location = partitionIdToLocations.head._2
-          val peer = location.getPeer
-          val peerWorker = new WorkerInfo(
-            peer.getHost,
-            peer.getRpcPort,
-            peer.getPushPort,
-            peer.getFetchPort,
-            peer.getReplicatePort)
           if (unavailablePeers.containsKey(peerWorker)) {
             pushMergedData.body().release()
-            
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
-            logError(
-              s"PushMergedData replication failed caused by unavailable peer 
for partitionLocation: $location")
-            pushMergedDataCallback.onFailure(
-              new 
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+            handlePushMergedDataConnectionFail(pushMergedDataCallback, 
location)
             return
           }
-
           // Handle the response from replica
           val wrappedCallback = new RpcResponseCallback() {
             override def onSuccess(response: ByteBuffer): Unit = {

Reply via email to