This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new eb1be3fbf [CELEBORN-1120] ShuffleClientImpl should close 
batchReviveRequestScheduler of ReviveManager
eb1be3fbf is described below

commit eb1be3fbf8078cc71e2208095931d9510bf00095
Author: SteNicholas <[email protected]>
AuthorDate: Fri Nov 10 11:44:47 2023 +0800

    [CELEBORN-1120] ShuffleClientImpl should close batchReviveRequestScheduler 
of ReviveManager
    
    ### What changes were proposed in this pull request?
    
    `ShuffleClientImpl` closes `batchReviveRequestScheduler` of `ReviveManager`.
    
    ### Why are the changes needed?
    
    After shuffle client is closed, `ReviveManager` still schedules invoker to 
`ShuffleClientImpl#reviveBatch`, which causes the `NullPointerException`. 
Therefore, `ShuffleClientImpl` should close `batchReviveRequestScheduler` of 
`ReviveManager` to avoid `NullPointerException`.
    
    ```
    23/11/08 18:09:25,819 [batch-revive-scheduler] ERROR ShuffleClientImpl: 
Exception raised while reviving for shuffle 0 partitionIds 1988, epochs 0,.
    java.lang.NullPointerException
            at 
org.apache.celeborn.client.ShuffleClientImpl.reviveBatch(ShuffleClientImpl.java:705)
            at 
org.apache.celeborn.client.ReviveManager.lambda$new$1(ReviveManager.java:94)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    23/11/08 18:09:25,844 [celeborn-retry-sender-6] ERROR ShuffleClientImpl: 
Push data to xx.xx.xx.xx:9092 failed for shuffle 0 map 216 attempt 0 partition 
1988 batch 2623, remain revive times 4.
    org.apache.celeborn.common.exception.CelebornIOException: 
PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY then revive but REVIVE_FAILED, revive 
status 12(REVIVE_FAILED), old location: PartitionLocation[
      id-epoch:1988-0
      
host-rpcPort-pushPort-fetchPort-replicatePort:xx.xx.xx.xx-9091-9092-9093-9094
      mode:PRIMARY
      peer:(empty)
      storage hint:StorageInfo{type=MEMORY, mountPoint='/tmp/storage', 
finalResult=false, filePath=}
      mapIdBitMap:null]
            at 
org.apache.celeborn.client.ShuffleClientImpl.submitRetryPushData(ShuffleClientImpl.java:261)
            at 
org.apache.celeborn.client.ShuffleClientImpl.access$600(ShuffleClientImpl.java:62)
            at 
org.apache.celeborn.client.ShuffleClientImpl$3.lambda$onFailure$1(ShuffleClientImpl.java:1045)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2084 from SteNicholas/CELEBORN-1120.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 .../org/apache/celeborn/client/ReviveManager.java  | 11 ++++++--
 .../apache/celeborn/client/ShuffleClientImpl.java  | 33 +++++++++++-----------
 2 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java 
b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
index 47d87878d..3c52e6b9f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
+++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
@@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.Duration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,17 +37,16 @@ class ReviveManager {
   private static final Logger logger = 
LoggerFactory.getLogger(ReviveManager.class);
 
   LinkedBlockingQueue<ReviveRequest> requestQueue = new 
LinkedBlockingQueue<>();
-  private final long interval;
   private final int batchSize;
   ShuffleClientImpl shuffleClient;
-  private ScheduledExecutorService batchReviveRequestScheduler =
+  private final ScheduledExecutorService batchReviveRequestScheduler =
       
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler");
 
   public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
     this.shuffleClient = shuffleClient;
-    this.interval = conf.clientPushReviveInterval();
     this.batchSize = conf.clientPushReviveBatchSize();
 
+    long interval = conf.clientPushReviveInterval();
     batchReviveRequestScheduler.scheduleWithFixedDelay(
         () -> {
           Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
@@ -124,4 +125,8 @@ class ReviveManager {
       logger.error("Exception when put into requests!", e);
     }
   }
+
+  public void close() {
+    ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms"));
+  }
 }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 39d382d10..a300ddf5f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private final int registerShuffleMaxRetries;
   private final long registerShuffleRetryWaitMs;
-  private int maxReviveTimes;
-  private boolean testRetryRevive;
+  private final int maxReviveTimes;
+  private final boolean testRetryRevive;
   private final int pushBufferMaxSize;
   protected final long pushDataTimeout;
 
@@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   protected final String appUniqueId;
 
-  private ThreadLocal<Compressor> compressorThreadLocal =
+  private final ThreadLocal<Compressor> compressorThreadLocal =
       new ThreadLocal<Compressor>() {
         @Override
         protected Compressor initialValue() {
@@ -601,13 +601,13 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   /**
-   * check if a newer PartitionLocation(with larger epoch) exists in local 
cache
+   * Check if a newer PartitionLocation(with larger epoch) exists in local 
cache.
    *
-   * @param shuffleMap
-   * @param partitionId
-   * @param epoch
-   * @param wait whether to wait for some time for a newer PartitionLocation
-   * @return
+   * @param shuffleMap The mapping between shuffle id and partition location.
+   * @param partitionId The id of partition.
+   * @param epoch The epoch of revive.
+   * @param wait Whether to wait for some time for a newer partition location.
+   * @return whether newer partition location exists in local cache.
    */
   boolean newerPartitionLocationExists(
       Map<Integer, PartitionLocation> shuffleMap, int partitionId, int epoch, 
boolean wait) {
@@ -675,12 +675,10 @@ public class ShuffleClientImpl extends ShuffleClient {
           attemptId,
           partitionId);
       return true;
-    } else if (results == null
-        || !results.containsKey(partitionId)
-        || results.get(partitionId) != StatusCode.SUCCESS.getValue()) {
-      return false;
     } else {
-      return true;
+      return results != null
+          && results.containsKey(partitionId)
+          && results.get(partitionId) == StatusCode.SUCCESS.getValue();
     }
   }
 
@@ -1595,7 +1593,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       throws IOException {
     ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
 
-    if (fileGroups.partitionGroups.size() == 0
+    if (fileGroups.partitionGroups.isEmpty()
         || !fileGroups.partitionGroups.containsKey(partitionId)) {
       logger.warn("Shuffle data is empty for shuffle {} partition {}.", 
shuffleId, partitionId);
       return CelebornInputStream.empty();
@@ -1622,6 +1620,9 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   @Override
   public void shutdown() {
+    if (null != reviveManager) {
+      reviveManager.close();
+    }
     if (null != rpcEnv) {
       rpcEnv.shutdown();
     }
@@ -1666,7 +1667,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     logger.debug("Push data failed cause message: {}", message);
     StatusCode cause;
     if (message == null) {
-      logger.error("Push data throw unexpected exception: {}", message);
+      logger.error("Push data throw unexpected exception");
       cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
     } else if 
(message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
       cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;

Reply via email to