SteNicholas commented on code in PR #2229:
URL: 
https://github.com/apache/incubator-celeborn/pull/2229#discussion_r1458255497


##########
client/src/main/java/org/apache/celeborn/client/write/DataPusher.java:
##########
@@ -137,6 +138,7 @@ public void run() {
           }
         };
     pushThread.setDaemon(true);
+    pushThread.setUncaughtExceptionHandler(new 
ThreadExceptionHandler("DataPusher-" + taskId));

Review Comment:
   Could `pushThread` use `ThreadUtils#newDaemonSingleThreadExecutor` instead 
of this way?



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -761,47 +760,44 @@ class PartitionFilesCleaner {
       new LinkedBlockingQueue<>();
   private final Lock lock = new ReentrantLock();
   private final Condition notEmpty = lock.newCondition();
-  private final Thread cleaner;
+  private final ExecutorService cleaner =
+      
ThreadUtils.newDaemonSingleThreadExecutor("worker-partition-file-cleaner");
 
   PartitionFilesCleaner(PartitionFilesSorter partitionFilesSorter) {
-    cleaner =
-        new Thread(
-            () -> {
+    cleaner.submit(
+        () -> {
+          try {
+            while (!partitionFilesSorter.isShutdown()) {
+              lock.lockInterruptibly();
               try {
-                while (!partitionFilesSorter.isShutdown()) {
-                  lock.lockInterruptibly();
+                // CELEBORN-1210: use while instead of if in case of spurious 
wakeup.
+                while (fileSorters.isEmpty()) {
+                  notEmpty.await();
+                }
+                Iterator<PartitionFilesSorter.FileSorter> it = 
fileSorters.iterator();
+                while (it.hasNext()) {
+                  PartitionFilesSorter.FileSorter sorter = it.next();
                   try {
-                    // CELEBORN-1210: use while instead of if in case of 
spurious wakeup.
-                    while (fileSorters.isEmpty()) {
-                      notEmpty.await();
+                    if (((DiskFileInfo) 
sorter.getOriginFileInfo()).isStreamsEmpty()) {
+                      logger.debug(
+                          "Deleting the original files for shuffle key {}: {}",
+                          sorter.getShuffleKey(),
+                          ((DiskFileInfo) 
sorter.getOriginFileInfo()).getFilePath());
+                      sorter.deleteOriginFiles();
+                      it.remove();
                     }
-                    Iterator<PartitionFilesSorter.FileSorter> it = 
fileSorters.iterator();
-                    while (it.hasNext()) {
-                      PartitionFilesSorter.FileSorter sorter = it.next();
-                      try {
-                        if (((DiskFileInfo) 
sorter.getOriginFileInfo()).isStreamsEmpty()) {
-                          logger.debug(
-                              "Deleting the original files for shuffle key {}: 
{}",
-                              sorter.getShuffleKey(),
-                              ((DiskFileInfo) 
sorter.getOriginFileInfo()).getFilePath());
-                          sorter.deleteOriginFiles();
-                          it.remove();
-                        }
-                      } catch (IOException e) {
-                        logger.error("catch IOException when delete origin 
files", e);
-                      }
-                    }
-                  } finally {
-                    lock.unlock();
+                  } catch (IOException e) {
+                    logger.error("catch IOException when delete origin files", 
e);
                   }
                 }
-              } catch (InterruptedException e) {
-                logger.warn("partition file cleaner thread interrupted while 
wait new sorter.", e);
+              } finally {
+                lock.unlock();
               }
-            });
-    cleaner.setName("partition-files-cleaner");
-    cleaner.setDaemon(true);
-    cleaner.start();
+            }
+          } catch (InterruptedException e) {
+            logger.warn("partition file cleaner thread interrupted while wait 
new sorter.", e);

Review Comment:
   ```suggestion
               logger.warn("Partition file cleaner thread interrupted while 
waiting new sorter.", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to