turboFei commented on code in PR #3008:
URL: https://github.com/apache/celeborn/pull/3008#discussion_r1913022633
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4884,6 +4885,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
+ val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("celeborn.rpc.retryWait")
Review Comment:
```
val RPC_RETRY_WAIT
```
And you can move this config to `celeborn.rpc` part.
##########
common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala:
##########
@@ -104,6 +106,7 @@ object RpcEnv {
abstract class RpcEnv(config: RpcEnvConfig) {
private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout
+ private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt
Review Comment:
`private[celeborn] val defaultRetryWait`
##########
common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala:
##########
@@ -88,4 +92,58 @@ abstract class RpcEndpointRef(conf: CelebornConf)
val future = ask[T](message, timeout)
timeout.awaitResult(future, address)
}
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and
get its result within a
+ * default timeout, retry if timeout, throw an exception if this still fails.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't
call it in a message
+ * loop of [[RpcEndpoint]].
+ *
+ * @param message the message to send
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askSync[T: ClassTag](message: Any, retryCount: Int): T =
+ askSync(message, defaultAskTimeout, retryCount)
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and
get its result within a
+ * specified timeout, retry if timeout, throw an exception if this still
fails.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't
call it in a message
+ * loop of [[RpcEndpoint]].
+ *
+ * @param message the message to send
+ * @param timeout the timeout duration
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount:
Int): T = {
Review Comment:
```
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int,
retryWait: Long)
```
##########
common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala:
##########
@@ -142,6 +145,41 @@ abstract class RpcEnv(config: RpcEnvConfig) {
setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName))
}
+ /**
+ * Retrieve the [[RpcEndpointRef]] represented by `address` and
`endpointName` with timeout retry.
+ * This is a blocking action.
+ */
+ def setupEndpointRef(
+ address: RpcAddress,
+ endpointName: String,
+ retryCount: Int): RpcEndpointRef = {
Review Comment:
```
def setupEndpointRef(
address: RpcAddress,
endpointName: String,
retryCount: Int,
retryWait: Long)
```
##########
common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala:
##########
@@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf)
extends Serializable with Logging {
private[this] val defaultAskTimeout = conf.rpcAskTimeout
+ private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt
Review Comment:
`private[this] val defaultRetryWait`
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4884,6 +4885,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
+ val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("celeborn.rpc.retryWait")
Review Comment:
I wonder that you can introduce a new config `celeborn.client.rpc.retryWait`
for client end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]