waitinfuture commented on code in PR #2064:
URL: 
https://github.com/apache/incubator-celeborn/pull/2064#discussion_r1388766217


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -349,12 +362,15 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       // will fast stop pushing data to the worker, we won't return congest 
status. But
       // in the long term, especially if this issue could frequently happen, 
we may need to return
       // congest&softSplit status together
+      writeLocalData(Seq(fileWriter), body, shuffleKey, None, None)

Review Comment:
   I think we still need to get the status of `writeLocalData` and call 
`onFailure` if not `Success`.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -573,16 +575,20 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
               // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
-              if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
-                callbackWithTimer.onFailure(e)
-              } else if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
-                callbackWithTimer.onFailure(e)
-              } else {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
-                callbackWithTimer.onFailure(
-                  new 
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
+              Try(Await.result(writePromise.future, Duration.Inf)) match {

Review Comment:
   ditto



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1197,54 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  private def writeLocalData(
+      fileWriters: Seq[FileWriter],
+      body: ByteBuf,
+      shuffleKey: String,
+      batchOffsets: Option[Array[Int]],
+      writePromise: Option[Promise[Unit]]): Unit = {

Review Comment:
   Since all cases will pass the `Promise`, I think we can remove the `Option`



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -818,32 +787,14 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       fileWriter.decrementPendingWrites()
       return;
     }
-
+    writeLocalData(Seq(fileWriter), body, shuffleKey, None, None)

Review Comment:
   ditto, we need to get status and call `onFailure` if not success



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -307,16 +315,20 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
               // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
-              if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_WRITE_FAIL_COUNT)
-                callbackWithTimer.onFailure(e)
-              } else if 
(e.getMessage.startsWith(StatusCode.PUSH_DATA_TIMEOUT_REPLICA.name())) {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_TIMEOUT_COUNT)
-                callbackWithTimer.onFailure(e)
-              } else {
-                
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT)
-                callbackWithTimer.onFailure(
-                  new 
CelebornIOException(StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA))
+              Try(Await.result(writePromise.future, Duration.Inf)) match {

Review Comment:
   I think we don't need to wait `writePromise` in this `onFailure`, and it's 
better to send back the replica exception instead of the primary exception. 
Client will parse the exception message to indicate the cause.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1197,54 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  private def writeLocalData(
+      fileWriters: Seq[FileWriter],
+      body: ByteBuf,
+      shuffleKey: String,
+      batchOffsets: Option[Array[Int]],
+      writePromise: Option[Promise[Unit]]): Unit = {
+    def writeData(fileWriter: FileWriter, body: ByteBuf, shuffleKey: String): 
Unit = {
+      try {
+        fileWriter.write(body)
+      } catch {
+        case e: AlreadyClosedException =>
+          fileWriter.decrementPendingWrites()
+          val (mapId, attemptId) = getMapAttempt(body)
+          val endedAttempt =
+            if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+              shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            } else -1
+          // TODO just info log for ended attempt
+          logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +
+            s" $attemptId), caused by AlreadyClosedException, endedAttempt 
$endedAttempt, error message: ${e.getMessage}")
+          writePromise.map(_.failure(e)).orElse(throw e)
+        case e: Exception =>
+          logError("Exception encountered when write.", e)
+          writePromise.map(_.failure(e)).orElse(throw e)
+      }
+    }
+    batchOffsets match {
+      case Some(batchOffsets) =>
+        batchOffsets.zip(fileWriters).foreach { case (offset, writer) =>
+          val length =

Review Comment:
   I think we need first check whether `writePromise` is complete, and skip 
handling if true for two reasons:
   1. To be aligned with original logic
   2. If two `fileWriter`s failed, the second one will throw Exception when it 
tries to call `_.failure`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to