This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3572f804ac [GLUTEN-7623][CH] Fix running cache command with error when 
executor add and removed. (#7625)
3572f804ac is described below

commit 3572f804ac57bcfde10d71dddc2b520ec9c6900d
Author: Shuai li <[email protected]>
AuthorDate: Tue Oct 22 13:13:30 2024 +0800

    [GLUTEN-7623][CH] Fix running cache command with error when executor add 
and removed. (#7625)
    
    * fix 7623
    
    * fix message
---
 .../java/org/apache/gluten/execution/CacheResult.java |  5 +++++
 .../sql/execution/commands/GlutenCacheBase.scala      | 19 ++++++++++++-------
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
index b6d538039e..660ac5b3a4 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
@@ -52,6 +52,11 @@ public class CacheResult implements Serializable {
     this.message = message;
   }
 
+  public CacheResult(Status status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
   public Status getStatus() {
     return status;
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
index c4e9f51bce..6a83877fb5 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
@@ -72,9 +72,8 @@ object GlutenCacheBase {
               case Status.ERROR =>
                 status = false
                 messages.append(
-                  s"executor : {}, failed with message: {};",
-                  job._1,
-                  result.getMessage)
+                  s"executor : ${job._1}, failed with message: 
${result.getMessage};"
+                )
                 complete = true
               case Status.SUCCESS =>
                 complete = true
@@ -111,10 +110,16 @@ object GlutenCacheBase {
     } else {
       val fetchStatus: (String, String) => Future[CacheResult] =
         (executorId: String, jobId: String) => {
-          GlutenDriverEndpoint.executorDataMap
-            .get(toExecutorId(executorId))
-            .executorEndpointRef
-            .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+          val data = 
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId))
+          if (data == null) {
+            Future.successful(
+              new CacheResult(
+                CacheResult.Status.ERROR,
+                s"The executor($executorId) status has changed, please try 
again later"))
+          } else {
+            data.executorEndpointRef
+              .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+          }
         }
       val res = waitAllJobFinish(resultList, fetchStatus)
       Seq(Row(res._1, res._2))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to