mridulm commented on code in PR #2663:
URL: https://github.com/apache/celeborn/pull/2663#discussion_r1704271395


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java:
##########
@@ -174,14 +174,12 @@ public void write(scala.collection.Iterator<Product2<K, 
V>> records) throws IOEx
       } else {
         write0(records);
       }
+      close();
     } catch (InterruptedException e) {
+      abort();
       TaskInterruptedHelper.throwTaskKillException();
-    } finally {
-      try {
-        close();
-      } catch (InterruptedException e) {
-        TaskInterruptedHelper.throwTaskKillException();
-      }
+    } catch (Exception e) {
+      abort();
     }

Review Comment:
   Instead, here and below, why not use a pattern like:
   ```
   var needAbort = true
   try {
     <code>
     needAbort = false
   } 
   // other catch blocks, which might set needAbort = false
   finally {
     if (needAbort) {
       abort()
     }
   }
   
   ```
   
   (If we need `throwTaskKillException` in `abort`, we will need to modify to 
handle that case as well)



##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java:
##########
@@ -319,6 +317,15 @@ private void flushSendBuffer(int partitionId, byte[] 
buffer, int size)
     writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
+  private void abort() throws IOException {
+    try {
+      dataPusher.waitOnTermination();
+    } catch (InterruptedException e) {
+      TaskInterruptedHelper.throwTaskKillException();

Review Comment:
   If `abort` is called due to an exception (`IOException` for ex), this will 
result in suppressing it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to