liuneng1994 commented on code in PR #6842:
URL: https://github.com/apache/incubator-gluten/pull/6842#discussion_r1721158569
##########
backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala:
##########
@@ -201,87 +203,112 @@ case class GlutenCHCacheDataCommand(
executorIdsToParts.put(executorId,
extensionTableNode.getExtensionTableStr)
}
})
-
- // send rpc call
+ val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
// send all parts to all executors
- val tableMessage =
executorIdsToParts.get(GlutenCHCacheDataCommand.ALL_EXECUTORS).get
- if (asynExecute) {
- GlutenDriverEndpoint.executorDataMap.forEach(
- (executorId, executor) => {
- executor.executorEndpointRef.send(
- GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava))
- })
- Seq(Row(true, ""))
- } else {
- val futureList = ArrayBuffer[Future[CacheLoadResult]]()
- val resultList = ArrayBuffer[CacheLoadResult]()
- GlutenDriverEndpoint.executorDataMap.forEach(
- (executorId, executor) => {
- futureList.append(
- executor.executorEndpointRef.ask[CacheLoadResult](
+ val tableMessage =
executorIdsToParts(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ GlutenDriverEndpoint.executorDataMap.forEach(
+ (executorId, executor) => {
+ futureList.append(
+ (
+ executorId,
+ executor.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava)
- ))
- })
- futureList.foreach(
- f => {
- resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
- })
- if (resultList.exists(!_.success)) {
- Seq(Row(false,
resultList.filter(!_.success).map(_.reason).mkString(";")))
- } else {
- Seq(Row(true, ""))
- }
- }
+ )))
+ })
} else {
- if (asynExecute) {
- executorIdsToParts.foreach(
- value => {
- val executorData =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
- if (executorData != null) {
- executorData.executorEndpointRef.send(
- GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava))
- } else {
- throw new GlutenException(
- s"executor ${value._1} not found," +
- s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
- }
- })
- Seq(Row(true, ""))
- } else {
- val futureList = ArrayBuffer[Future[CacheLoadResult]]()
- val resultList = ArrayBuffer[CacheLoadResult]()
- executorIdsToParts.foreach(
- value => {
- val executorData =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
- if (executorData != null) {
- futureList.append(
- executorData.executorEndpointRef.ask[CacheLoadResult](
- GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava)
- ))
- } else {
- throw new GlutenException(
- s"executor ${value._1} not found," +
- s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
- }
- })
- futureList.foreach(
- f => {
- resultList.append(ThreadUtils.awaitResult(f, Duration.Inf))
- })
- if (resultList.exists(!_.success)) {
- Seq(Row(false,
resultList.filter(!_.success).map(_.reason).mkString(";")))
- } else {
- Seq(Row(true, ""))
- }
- }
+ executorIdsToParts.foreach(
+ value => {
+ checkExecutorId(value._1)
+ val executorData =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
+ futureList.append(
+ (
+ value._1,
+ executorData.executorEndpointRef.ask[CacheJobInfo](
+ GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava)
+ )))
+ })
+ }
+ val resultList = waitRpcResults(futureList)
+ if (asynExecute) {
+ val res = collectJobTriggerResult(resultList)
+ Seq(Row(res._1, res._2.mkString(";")))
+ } else {
+ val res = waitAllJobFinish(resultList)
+ Seq(Row(res._1, res._2))
}
}
+
}
object GlutenCHCacheDataCommand {
- val ALL_EXECUTORS = "allExecutors"
+ private val ALL_EXECUTORS = "allExecutors"
private def toExecutorId(executorId: String): String =
executorId.split("_").last
+
+ def waitAllJobFinish(jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean,
String) = {
+ val res = collectJobTriggerResult(jobs)
+ var status = res._1
+ val messages = res._2
+ jobs.foreach(
+ job => {
+ if (status) {
+ var complete = false
+ while (!complete) {
+ Thread.sleep(5000)
Review Comment:
预期异步加载时间是分钟级到小时级,频率不宜过高
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]