SteNicholas commented on code in PR #2229:
URL:
https://github.com/apache/incubator-celeborn/pull/2229#discussion_r1458255497
##########
client/src/main/java/org/apache/celeborn/client/write/DataPusher.java:
##########
@@ -137,6 +138,7 @@ public void run() {
}
};
pushThread.setDaemon(true);
+ pushThread.setUncaughtExceptionHandler(new
ThreadExceptionHandler("DataPusher-" + taskId));
Review Comment:
Could `pushThread` use `ThreadUtils#newDaemonSingleThreadExecutor` instead
of this way?
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -761,47 +760,44 @@ class PartitionFilesCleaner {
new LinkedBlockingQueue<>();
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
- private final Thread cleaner;
+ private final ExecutorService cleaner =
+
ThreadUtils.newDaemonSingleThreadExecutor("worker-partition-file-cleaner");
PartitionFilesCleaner(PartitionFilesSorter partitionFilesSorter) {
- cleaner =
- new Thread(
- () -> {
+ cleaner.submit(
+ () -> {
+ try {
+ while (!partitionFilesSorter.isShutdown()) {
+ lock.lockInterruptibly();
try {
- while (!partitionFilesSorter.isShutdown()) {
- lock.lockInterruptibly();
+ // CELEBORN-1210: use while instead of if in case of spurious
wakeup.
+ while (fileSorters.isEmpty()) {
+ notEmpty.await();
+ }
+ Iterator<PartitionFilesSorter.FileSorter> it =
fileSorters.iterator();
+ while (it.hasNext()) {
+ PartitionFilesSorter.FileSorter sorter = it.next();
try {
- // CELEBORN-1210: use while instead of if in case of
spurious wakeup.
- while (fileSorters.isEmpty()) {
- notEmpty.await();
+ if (((DiskFileInfo)
sorter.getOriginFileInfo()).isStreamsEmpty()) {
+ logger.debug(
+ "Deleting the original files for shuffle key {}: {}",
+ sorter.getShuffleKey(),
+ ((DiskFileInfo)
sorter.getOriginFileInfo()).getFilePath());
+ sorter.deleteOriginFiles();
+ it.remove();
}
- Iterator<PartitionFilesSorter.FileSorter> it =
fileSorters.iterator();
- while (it.hasNext()) {
- PartitionFilesSorter.FileSorter sorter = it.next();
- try {
- if (((DiskFileInfo)
sorter.getOriginFileInfo()).isStreamsEmpty()) {
- logger.debug(
- "Deleting the original files for shuffle key {}:
{}",
- sorter.getShuffleKey(),
- ((DiskFileInfo)
sorter.getOriginFileInfo()).getFilePath());
- sorter.deleteOriginFiles();
- it.remove();
- }
- } catch (IOException e) {
- logger.error("catch IOException when delete origin
files", e);
- }
- }
- } finally {
- lock.unlock();
+ } catch (IOException e) {
+ logger.error("catch IOException when delete origin files",
e);
}
}
- } catch (InterruptedException e) {
- logger.warn("partition file cleaner thread interrupted while
wait new sorter.", e);
+ } finally {
+ lock.unlock();
}
- });
- cleaner.setName("partition-files-cleaner");
- cleaner.setDaemon(true);
- cleaner.start();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("partition file cleaner thread interrupted while wait
new sorter.", e);
Review Comment:
```suggestion
logger.warn("Partition file cleaner thread interrupted while
waiting new sorter.", e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]