andygrove commented on code in PR #3142:
URL: https://github.com/apache/datafusion-comet/pull/3142#discussion_r2692078362
##########
spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java:
##########
@@ -571,17 +573,29 @@ public Option<MapStatus> stop(boolean success) {
}
}
} finally {
+ // Cleanup is centralized here to avoid double-cleanup issues.
+ // This finally block always runs, whether stop() is called after
successful write()
+ // or after a failed write(). The sorter will be null if write()
completed successfully.
if (sorter != null) {
- // If sorter is non-null, then this implies that we called stop() in
response to an error,
- // so we need to clean up memory and spill files created by the sorter
- sorter.cleanupResources();
+ try {
+ sorter.cleanupResources();
+ } catch (Exception e) {
+ // Log but don't throw to avoid masking any exception from the try
block above
+ logger.error("Failed to cleanup shuffle writer resources in stop()",
e);
+ } finally {
+ sorter = null;
+ allocator = null;
+ }
+ } else if (allocator != null) {
+ // If sorter is null but allocator isn't, still clean up allocator
reference
+ allocator = null;
}
}
}
@Override
public long[] getPartitionLengths() {
- return new long[0];
+ return partitionLengths != null ? partitionLengths : new long[0];
Review Comment:
I wonder if there is any impact to actaully implementing this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]