This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 15ea5d366 [CELEBORN-1930][CIP-12] Support HARD_SPLIT in PushMergedData
should handle congestion control NPE issue
15ea5d366 is described below
commit 15ea5d366470b691b2bea16164c0605b510a45d5
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Mar 26 23:44:04 2025 -0700
[CELEBORN-1930][CIP-12] Support HARD_SPLIT in PushMergedData should handle
congestion control NPE issue
### What changes were proposed in this pull request?
When hard split happens, some FileWriters may be null which causes handle
congestion control NPE.
### Why are the changes needed?
When hard split happens, some FileWriters may be null which causes handle
congestion control NPE.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing uts.
Closes #3176 from leixm/CELEBORN-1930.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../service/deploy/worker/PushDataHandler.scala | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 30a12535b..c8686fda1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -675,9 +675,13 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
// Only primary data enable replication will push data to
replica
Option(CongestionController.instance()) match {
- case Some(congestionController) if fileWriters.nonEmpty =>
- if (congestionController.isUserCongested(
- fileWriters.head.getUserCongestionControlContext)) {
+ case Some(congestionController) =>
+ val userCongested =
+ fileWriters
+ .find(_ != null)
+ .map(_.getUserCongestionControlContext)
+ .exists(congestionController.isUserCongested)
+ if (userCongested) {
// Check whether primary congest the data though the
replicas doesn't congest
// it(the response is empty)
pushMergedDataCallback.onSuccess(
@@ -780,9 +784,13 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
index += 1
}
Option(CongestionController.instance()) match {
- case Some(congestionController) if fileWriters.nonEmpty =>
- if (congestionController.isUserCongested(
- fileWriters.head.getUserCongestionControlContext)) {
+ case Some(congestionController) =>
+ val userCongested =
+ fileWriters
+ .find(_ != null)
+ .map(_.getUserCongestionControlContext)
+ .exists(congestionController.isUserCongested)
+ if (userCongested) {
if (isPrimary) {
pushMergedDataCallback.onSuccess(StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED)
} else {