This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2eb4c23eb [CELEBORN-1771] Bring forward
PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
2eb4c23eb is described below
commit 2eb4c23eb8c87788daea66caa8e331e024d160b5
Author: zhengtao <[email protected]>
AuthorDate: Tue Dec 24 11:09:19 2024 +0800
[CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
### What changes were proposed in this pull request?
1.Move the peerWorker available judgement out of ThreadPool.
2.Move `retain` after the available worker judgment Which means we don't
have to release if peerWorker is unavailable.
2. Add `fileWriter.decrementPendingWrites()` if peerWorker is unavailable
since it will return and won't decrementPendingWreites in `writeLocalData`.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT & cluster testing.
Closes #2989 from zaynt4606/clb1771.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/PushDataHandler.scala | 80 ++++++++++++++--------
1 file changed, 53 insertions(+), 27 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 15e217c88..f460e768e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -273,26 +273,27 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
// for primary, send data to replica
if (doReplicate) {
+ val peer = location.getPeer
+ val peerWorker = new WorkerInfo(
+ peer.getHost,
+ peer.getRpcPort,
+ peer.getPushPort,
+ peer.getFetchPort,
+ peer.getReplicatePort)
+ if (unavailablePeers.containsKey(peerWorker)) {
+ fileWriter.decrementPendingWrites()
+ handlePushDataConnectionFail(callbackWithTimer, location)
+ return
+ }
+
pushData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
- val peer = location.getPeer
- val peerWorker = new WorkerInfo(
- peer.getHost,
- peer.getRpcPort,
- peer.getPushPort,
- peer.getFetchPort,
- peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
-
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
- logError(
- s"PushData replication failed caused by unavailable peer for
partitionLocation: $location")
- callbackWithTimer.onFailure(
- new
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+ handlePushDataConnectionFail(callbackWithTimer, location)
return
}
-
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
@@ -424,6 +425,26 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
}
+ def handlePushDataConnectionFail(
+ callbackWithTimer: RpcResponseCallback,
+ location: PartitionLocation): Unit = {
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+ logError(
+ s"PushData replication failed caused by unavailable peer for
partitionLocation: $location")
+ callbackWithTimer.onFailure(
+ new
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+ }
+
+ def handlePushMergedDataConnectionFail(
+ pushMergedDataCallback: PushMergedDataCallback,
+ location: PartitionLocation): Unit = {
+
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
+ logError(
+ s"PushMergedData replication failed caused by unavailable peer for
partitionLocation: $location")
+ pushMergedDataCallback.onFailure(
+ new
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+ }
+
def handlePushMergedData(
pushMergedData: PushMergedData,
callback: RpcResponseCallback): Unit = {
@@ -582,27 +603,32 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
// for primary, send data to replica
if (doReplicate) {
+ val location = partitionIdToLocations.head._2
+ val peer = location.getPeer
+ val peerWorker = new WorkerInfo(
+ peer.getHost,
+ peer.getRpcPort,
+ peer.getPushPort,
+ peer.getFetchPort,
+ peer.getReplicatePort)
+ if (unavailablePeers.containsKey(peerWorker)) {
+ for (fileWriterIndex <- 0 until totalFileWriters) {
+ val fileWriter = fileWriters(fileWriterIndex)
+ if (fileWriter != null &&
!pushMergedDataCallback.isHardSplitPartition(fileWriterIndex)) {
+ fileWriter.decrementPendingWrites()
+ }
+ }
+ handlePushMergedDataConnectionFail(pushMergedDataCallback, location)
+ return
+ }
pushMergedData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
- val location = partitionIdToLocations.head._2
- val peer = location.getPeer
- val peerWorker = new WorkerInfo(
- peer.getHost,
- peer.getRpcPort,
- peer.getPushPort,
- peer.getFetchPort,
- peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
-
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
- logError(
- s"PushMergedData replication failed caused by unavailable peer
for partitionLocation: $location")
- pushMergedDataCallback.onFailure(
- new
CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
+ handlePushMergedDataConnectionFail(pushMergedDataCallback,
location)
return
}
-
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {