This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3572f804ac [GLUTEN-7623][CH] Fix running cache command with error when
executor add and removed. (#7625)
3572f804ac is described below
commit 3572f804ac57bcfde10d71dddc2b520ec9c6900d
Author: Shuai li <[email protected]>
AuthorDate: Tue Oct 22 13:13:30 2024 +0800
[GLUTEN-7623][CH] Fix running cache command with error when executor add
and removed. (#7625)
* fix 7623
* fix message
---
.../java/org/apache/gluten/execution/CacheResult.java | 5 +++++
.../sql/execution/commands/GlutenCacheBase.scala | 19 ++++++++++++-------
2 files changed, 17 insertions(+), 7 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
index b6d538039e..660ac5b3a4 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
@@ -52,6 +52,11 @@ public class CacheResult implements Serializable {
this.message = message;
}
+ public CacheResult(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
public Status getStatus() {
return status;
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
index c4e9f51bce..6a83877fb5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
@@ -72,9 +72,8 @@ object GlutenCacheBase {
case Status.ERROR =>
status = false
messages.append(
- s"executor : {}, failed with message: {};",
- job._1,
- result.getMessage)
+ s"executor : ${job._1}, failed with message:
${result.getMessage};"
+ )
complete = true
case Status.SUCCESS =>
complete = true
@@ -111,10 +110,16 @@ object GlutenCacheBase {
} else {
val fetchStatus: (String, String) => Future[CacheResult] =
(executorId: String, jobId: String) => {
- GlutenDriverEndpoint.executorDataMap
- .get(toExecutorId(executorId))
- .executorEndpointRef
- .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+ val data =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId))
+ if (data == null) {
+ Future.successful(
+ new CacheResult(
+ CacheResult.Status.ERROR,
+ s"The executor($executorId) status has changed, please try
again later"))
+ } else {
+ data.executorEndpointRef
+ .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+ }
}
val res = waitAllJobFinish(resultList, fetchStatus)
Seq(Row(res._1, res._2))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]