rmcyang commented on code in PR #2064:
URL: 
https://github.com/apache/incubator-celeborn/pull/2064#discussion_r1385664184


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1215,28 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  def writeDataWithExceptionHandling(
+      fileWriter: FileWriter,
+      body: ByteBuf,
+      shuffleKey: String): Future[Unit] = {
+    Future {
+      try {
+        fileWriter.write(body)
+      } catch {
+        case e: AlreadyClosedException =>
+          fileWriter.decrementPendingWrites()
+          val (mapId, attemptId) = getMapAttempt(body)
+          val endedAttempt =
+            if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+              shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            } else -1
+          logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +

Review Comment:
   should this be error logging?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1215,28 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  def writeDataWithExceptionHandling(
+      fileWriter: FileWriter,
+      body: ByteBuf,
+      shuffleKey: String): Future[Unit] = {
+    Future {
+      try {
+        fileWriter.write(body)
+      } catch {
+        case e: AlreadyClosedException =>

Review Comment:
   The original logic has another case to catch the general `Exception`, but 
here we only catch for `AlreadyClosedException`. Why the case for `Exception` 
is dropped here or am I missing something?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1246,6 +1215,28 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
+  def writeDataWithExceptionHandling(
+      fileWriter: FileWriter,
+      body: ByteBuf,
+      shuffleKey: String): Future[Unit] = {
+    Future {
+      try {
+        fileWriter.write(body)
+      } catch {
+        case e: AlreadyClosedException =>
+          fileWriter.decrementPendingWrites()
+          val (mapId, attemptId) = getMapAttempt(body)
+          val endedAttempt =
+            if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+              shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            } else -1
+          logWarning(s"Append data failed for task(shuffle $shuffleKey, map 
$mapId, attempt" +

Review Comment:
   There could be two cases when invoke this method: handleMapPartitionPushData 
and handlePushMergedData. And the original code annotate which case it is here. 
Not sure how it is helpful but can we retain which case it is here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to