This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new f66798e7d [CELEBORN-1120] ShuffleClientImpl should close
batchReviveRequestScheduler of ReviveManager
f66798e7d is described below
commit f66798e7d53dbc2f97fd2250ceadd1a47017d0c0
Author: SteNicholas <[email protected]>
AuthorDate: Fri Nov 10 11:44:47 2023 +0800
[CELEBORN-1120] ShuffleClientImpl should close batchReviveRequestScheduler
of ReviveManager
### What changes were proposed in this pull request?
`ShuffleClientImpl` closes `batchReviveRequestScheduler` of `ReviveManager`.
### Why are the changes needed?
After shuffle client is closed, `ReviveManager` still schedules invoker to
`ShuffleClientImpl#reviveBatch`, which causes the `NullPointerException`.
Therefore, `ShuffleClientImpl` should close `batchReviveRequestScheduler` of
`ReviveManager` to avoid `NullPointerException`.
```
23/11/08 18:09:25,819 [batch-revive-scheduler] ERROR ShuffleClientImpl:
Exception raised while reviving for shuffle 0 partitionIds 1988, epochs 0,.
java.lang.NullPointerException
at
org.apache.celeborn.client.ShuffleClientImpl.reviveBatch(ShuffleClientImpl.java:705)
at
org.apache.celeborn.client.ReviveManager.lambda$new$1(ReviveManager.java:94)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23/11/08 18:09:25,844 [celeborn-retry-sender-6] ERROR ShuffleClientImpl:
Push data to xx.xx.xx.xx:9092 failed for shuffle 0 map 216 attempt 0 partition
1988 batch 2623, remain revive times 4.
org.apache.celeborn.common.exception.CelebornIOException:
PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY then revive but REVIVE_FAILED, revive
status 12(REVIVE_FAILED), old location: PartitionLocation[
id-epoch:1988-0
host-rpcPort-pushPort-fetchPort-replicatePort:xx.xx.xx.xx-9091-9092-9093-9094
mode:PRIMARY
peer:(empty)
storage hint:StorageInfo{type=MEMORY, mountPoint='/tmp/storage',
finalResult=false, filePath=}
mapIdBitMap:null]
at
org.apache.celeborn.client.ShuffleClientImpl.submitRetryPushData(ShuffleClientImpl.java:261)
at
org.apache.celeborn.client.ShuffleClientImpl.access$600(ShuffleClientImpl.java:62)
at
org.apache.celeborn.client.ShuffleClientImpl$3.lambda$onFailure$1(ShuffleClientImpl.java:1045)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #2084 from SteNicholas/CELEBORN-1120.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
(cherry picked from commit eb1be3fbf8078cc71e2208095931d9510bf00095)
Signed-off-by: Fu Chen <[email protected]>
---
.../org/apache/celeborn/client/ReviveManager.java | 11 ++++++--
.../apache/celeborn/client/ShuffleClientImpl.java | 33 +++++++++++-----------
2 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
index 3bbf28ecc..1e2b04dcb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
+++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
@@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.Duration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,17 +37,16 @@ class ReviveManager {
private static final Logger logger =
LoggerFactory.getLogger(ReviveManager.class);
LinkedBlockingQueue<ReviveRequest> requestQueue = new
LinkedBlockingQueue<>();
- private final long interval;
private final int batchSize;
ShuffleClientImpl shuffleClient;
- private ScheduledExecutorService batchReviveRequestScheduler =
+ private final ScheduledExecutorService batchReviveRequestScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler");
public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
this.shuffleClient = shuffleClient;
- this.interval = conf.clientPushReviveInterval();
this.batchSize = conf.clientPushReviveBatchSize();
+ long interval = conf.clientPushReviveInterval();
batchReviveRequestScheduler.scheduleWithFixedDelay(
() -> {
Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
@@ -125,4 +126,8 @@ class ReviveManager {
logger.error("Exception when put into requests!", e);
}
}
+
+ public void close() {
+ ThreadUtils.shutdown(batchReviveRequestScheduler, Duration.apply("800ms"));
+ }
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c0c157a59..0e53a8425 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -73,8 +73,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private final int registerShuffleMaxRetries;
private final long registerShuffleRetryWaitMs;
- private int maxReviveTimes;
- private boolean testRetryRevive;
+ private final int maxReviveTimes;
+ private final boolean testRetryRevive;
private final int pushBufferMaxSize;
protected final long pushDataTimeout;
@@ -113,7 +113,7 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final String appUniqueId;
- private ThreadLocal<Compressor> compressorThreadLocal =
+ private final ThreadLocal<Compressor> compressorThreadLocal =
new ThreadLocal<Compressor>() {
@Override
protected Compressor initialValue() {
@@ -601,13 +601,13 @@ public class ShuffleClientImpl extends ShuffleClient {
}
/**
- * check if a newer PartitionLocation(with larger epoch) exists in local
cache
+ * Check if a newer PartitionLocation(with larger epoch) exists in local
cache.
*
- * @param shuffleMap
- * @param partitionId
- * @param epoch
- * @param wait whether to wait for some time for a newer PartitionLocation
- * @return
+ * @param shuffleMap The mapping between shuffle id and partition location.
+ * @param partitionId The id of partition.
+ * @param epoch The epoch of revive.
+ * @param wait Whether to wait for some time for a newer partition location.
+ * @return whether newer partition location exists in local cache.
*/
boolean newerPartitionLocationExists(
Map<Integer, PartitionLocation> shuffleMap, int partitionId, int epoch,
boolean wait) {
@@ -675,12 +675,10 @@ public class ShuffleClientImpl extends ShuffleClient {
attemptId,
partitionId);
return true;
- } else if (results == null
- || !results.containsKey(partitionId)
- || results.get(partitionId) != StatusCode.SUCCESS.getValue()) {
- return false;
} else {
- return true;
+ return results != null
+ && results.containsKey(partitionId)
+ && results.get(partitionId) == StatusCode.SUCCESS.getValue();
}
}
@@ -1595,7 +1593,7 @@ public class ShuffleClientImpl extends ShuffleClient {
throws IOException {
ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
- if (fileGroups.partitionGroups.size() == 0
+ if (fileGroups.partitionGroups.isEmpty()
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
logger.warn("Shuffle data is empty for shuffle {} partition {}.",
shuffleId, partitionId);
return CelebornInputStream.empty();
@@ -1622,6 +1620,9 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void shutdown() {
+ if (null != reviveManager) {
+ reviveManager.close();
+ }
if (null != rpcEnv) {
rpcEnv.shutdown();
}
@@ -1666,7 +1667,7 @@ public class ShuffleClientImpl extends ShuffleClient {
logger.debug("Push data failed cause message: " + message);
StatusCode cause;
if (message == null) {
- logger.error("Push data throw unexpected exception: {}", message);
+ logger.error("Push data throw unexpected exception");
cause = StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE;
} else if
(message.startsWith(StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA.name())) {
cause = StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA;