This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0d5809ff [CELEBORN-192][IMPROVEMENT] Change FAILED status to
REQUEST_FAILED since it's all used when RPC request failed. (#1139)
0d5809ff is described below
commit 0d5809ff0c8be8ab1c91539fa6f1b5c51cd7552d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jan 6 16:53:04 2023 +0800
[CELEBORN-192][IMPROVEMENT] Change FAILED status to REQUEST_FAILED since
it's all used when RPC request failed. (#1139)
---
.../org/apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../org/apache/celeborn/client/LifecycleManager.scala | 16 ++++++++--------
.../apache/celeborn/client/commit/CommitHandler.scala | 4 ++--
.../celeborn/common/protocol/message/StatusCode.java | 2 +-
.../scala/org/apache/celeborn/common/util/Utils.scala | 2 +-
.../apache/celeborn/service/deploy/master/Master.scala | 2 +-
6 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 04c951ff..3735dbe6 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -344,7 +344,7 @@ public class ShuffleClientImpl extends ShuffleClient {
} else {
logger.error(
"LifecycleManager request slots return {}, retry again, remain
retry times {}",
- StatusCode.FAILED,
+ StatusCode.REQUEST_FAILED,
numRetries - 1);
}
} catch (Exception e) {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9908bd5c..c22d3cb6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -409,9 +409,9 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
val res = requestSlotsWithRetry(applicationId, shuffleId, ids)
res.status match {
- case StatusCode.FAILED =>
+ case StatusCode.REQUEST_FAILED =>
logError(s"OfferSlots RPC request failed for $shuffleId!")
- reply(RegisterShuffleResponse(StatusCode.FAILED, Array.empty))
+ reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
return
case StatusCode.SLOT_NOT_AVAILABLE =>
logError(s"OfferSlots for $shuffleId failed!")
@@ -1119,7 +1119,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync RegisterShuffle for $shuffleKey failed.", e)
- RequestSlotsResponse(StatusCode.FAILED, new WorkerResource())
+ RequestSlotsResponse(StatusCode.REQUEST_FAILED, new WorkerResource())
}
}
@@ -1134,7 +1134,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
val msg = s"Exception when askSync ReserveSlots for $shuffleKey " +
s"on worker $endpoint."
logError(msg, e)
- ReserveSlotsResponse(StatusCode.FAILED, msg + s" ${e.getMessage}")
+ ReserveSlotsResponse(StatusCode.REQUEST_FAILED, msg + s"
${e.getMessage}")
}
}
@@ -1144,7 +1144,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync Destroy for ${message.shuffleKey} failed.", e)
- DestroyResponse(StatusCode.FAILED, message.masterLocations,
message.slaveLocations)
+ DestroyResponse(StatusCode.REQUEST_FAILED, message.masterLocations,
message.slaveLocations)
}
}
@@ -1156,7 +1156,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync ReleaseSlots for ${message.shuffleId} failed.", e)
- ReleaseSlotsResponse(StatusCode.FAILED)
+ ReleaseSlotsResponse(StatusCode.REQUEST_FAILED)
}
}
@@ -1170,7 +1170,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync UnregisterShuffle for ${message.getShuffleId}
failed.", e)
- UnregisterShuffleResponse(StatusCode.FAILED)
+ UnregisterShuffleResponse(StatusCode.REQUEST_FAILED)
}
}
@@ -1182,7 +1182,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
} catch {
case e: Exception =>
logError(s"AskSync GetBlacklist failed.", e)
- GetBlacklistResponse(StatusCode.FAILED, List.empty.asJava,
List.empty.asJava)
+ GetBlacklistResponse(StatusCode.REQUEST_FAILED, List.empty.asJava,
List.empty.asJava)
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 9c21c364..e9ace39c 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -290,7 +290,7 @@ abstract class CommitHandler(
res.status match {
case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.FAILED =>
+ case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.REQUEST_FAILED =>
logDebug(s"Request $commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
@@ -431,7 +431,7 @@ abstract class CommitHandler(
}
CommitFilesResponse(
- StatusCode.FAILED,
+ StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
message.masterIds,
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 0fb46765..75068a87 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -21,7 +21,7 @@ public enum StatusCode {
// 1/0 Status
SUCCESS(0),
PARTIAL_SUCCESS(1),
- FAILED(2),
+ REQUEST_FAILED(2),
// Specific Status
SHUFFLE_ALREADY_REGISTERED(3),
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index a266d873..875b75fd 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -837,7 +837,7 @@ object Utils extends Logging {
case 1 =>
StatusCode.PARTIAL_SUCCESS
case 2 =>
- StatusCode.FAILED
+ StatusCode.REQUEST_FAILED
case 3 =>
StatusCode.SHUFFLE_ALREADY_REGISTERED
case 4 =>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 62a7f113..9add6354 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -718,7 +718,7 @@ private[celeborn] class Master(
new util.HashMap[String, DiskInfo](),
new ConcurrentHashMap[UserIdentifier, ResourceConsumption](),
null))
- GetWorkerInfosResponse(StatusCode.FAILED, result.asScala: _*)
+ GetWorkerInfosResponse(StatusCode.REQUEST_FAILED, result.asScala: _*)
}
}