mridulm commented on code in PR #2663:
URL: https://github.com/apache/celeborn/pull/2663#discussion_r1720584123
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java:
##########
@@ -319,6 +324,21 @@ private void flushSendBuffer(int partitionId, byte[]
buffer, int size)
writeMetrics.incWriteTime(System.nanoTime() - start);
}
+ private void abort(IOException lastIOException) throws IOException {
Review Comment:
QQ: Looking at this more, `abort` is called only when some exception was
thrown - right ?
If yes, we can simply make this:
```
public void write(scala.collection.Iterator<Product2<K, V>> records) throws
IOException {
boolean needAbort = true;
try {
// <current logic>
close();
needAbort = false;
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
} finally {
if (needAbort) {
abort(false);
}
}
<snip>
private void abort(boolean throwTaskKilledOnInterruption) {
try {
dataPusher.waitOnTermination();
sendBufferPool.returnPushTaskQueue(dataPusher.getIdleQueue());
} catch (InterruptedException e) {
if (throwTaskKilledOnInterruption) {
TaskInterruptedHelper.throwTaskKillException();
}
}
}
```
I am keeping `throwTaskKilledOnInterruption` explicitly here to illustrate -
but it should always be `false` - since there will always be an exception
raised when needAbort == true - right ?
(Apologies for not catching this simplification earlier)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]