This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 205e1335b8 [CORE] Refactor task error logging into dedicated utility 
class (#11718)
205e1335b8 is described below

commit 205e1335b8542a2fc8852c9824d0e76c055e5787
Author: Pratham Manja <[email protected]>
AuthorDate: Tue Mar 10 14:50:53 2026 +0530

    [CORE] Refactor task error logging into dedicated utility class (#11718)
---
 .../org/apache/gluten/task/TaskErrorLogger.scala   | 43 ++++++++++++++++++++++
 .../org/apache/spark/task/TaskResources.scala      | 14 +++----
 2 files changed, 49 insertions(+), 8 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala 
b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala
new file mode 100644
index 0000000000..e2cb2d7bf8
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.task
+
+import org.apache.spark.{TaskContext, TaskKilledException}
+import org.apache.spark.internal.Logging
+
+/** Utility object for logging task errors in a consistent manner. */
+object TaskErrorLogger extends Logging {
+
+  /**
+   * Logs task failure errors with appropriate filtering.
+   *
+   * @param context
+   *   The TaskContext of the failed task
+   * @param error
+   *   The error that caused the task failure
+   */
+  def logTaskFailure(context: TaskContext, error: Throwable): Unit = {
+    error match {
+      case e: TaskKilledException if e.reason == "another attempt succeeded" =>
+        // This is an expected scenario in speculative execution, no need to 
log as error
+        logDebug(s"Task ${context.taskAttemptId()} was killed because another 
attempt succeeded")
+      case _ =>
+        // Log genuine errors for debugging
+        logError(s"Task ${context.taskAttemptId()} failed with error: ", error)
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala 
b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
index bf7c8cd422..2c386e52ff 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
@@ -18,9 +18,9 @@ package org.apache.spark.task
 
 import org.apache.gluten.config.GlutenCoreConfig
 import org.apache.gluten.memory.SimpleMemoryUsageRecorder
-import org.apache.gluten.task.TaskListener
+import org.apache.gluten.task.{TaskErrorLogger, TaskListener}
 
-import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException, 
UnknownReason}
+import org.apache.spark.{TaskContext, TaskFailedReason, UnknownReason}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{SparkTaskUtil, TaskCompletionListener, 
TaskFailureListener}
@@ -204,16 +204,14 @@ object TaskResources extends TaskListener with Logging {
       }
       val registry = new TaskResourceRegistry
       RESOURCE_REGISTRIES.put(tc, registry)
+      // TODO: Propose upstream Spark changes for resilient error logging when
+      // CompletionListener crashes. Using TaskErrorLogger as workaround
       tc.addTaskFailureListener(
         // in case of crashing in task completion listener, errors may be 
swallowed
         new TaskFailureListener {
           override def onTaskFailure(context: TaskContext, error: Throwable): 
Unit = {
-            // TODO:
-            // The general duty of printing error message should not reside in 
memory module
-            error match {
-              case e: TaskKilledException if e.reason == "another attempt 
succeeded" =>
-              case _ => logError(s"Task ${context.taskAttemptId()} failed by 
error: ", error)
-            }
+            // Delegate error logging to TaskErrorLogger utility
+            TaskErrorLogger.logTaskFailure(context, error)
           }
         })
       tc.addTaskCompletionListener(new TaskCompletionListener {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to