This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 205e1335b8 [CORE] Refactor task error logging into dedicated utility
class (#11718)
205e1335b8 is described below
commit 205e1335b8542a2fc8852c9824d0e76c055e5787
Author: Pratham Manja <[email protected]>
AuthorDate: Tue Mar 10 14:50:53 2026 +0530
[CORE] Refactor task error logging into dedicated utility class (#11718)
---
.../org/apache/gluten/task/TaskErrorLogger.scala | 43 ++++++++++++++++++++++
.../org/apache/spark/task/TaskResources.scala | 14 +++----
2 files changed, 49 insertions(+), 8 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala
b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala
new file mode 100644
index 0000000000..e2cb2d7bf8
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.task
+
+import org.apache.spark.{TaskContext, TaskKilledException}
+import org.apache.spark.internal.Logging
+
+/** Utility object for logging task errors in a consistent manner. */
+object TaskErrorLogger extends Logging {
+
+ /**
+ * Logs task failure errors with appropriate filtering.
+ *
+ * @param context
+ * The TaskContext of the failed task
+ * @param error
+ * The error that caused the task failure
+ */
+ def logTaskFailure(context: TaskContext, error: Throwable): Unit = {
+ error match {
+ case e: TaskKilledException if e.reason == "another attempt succeeded" =>
+ // This is an expected scenario in speculative execution, no need to
log as error
+ logDebug(s"Task ${context.taskAttemptId()} was killed because another
attempt succeeded")
+ case _ =>
+ // Log genuine errors for debugging
+ logError(s"Task ${context.taskAttemptId()} failed with error: ", error)
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
index bf7c8cd422..2c386e52ff 100644
--- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala
@@ -18,9 +18,9 @@ package org.apache.spark.task
import org.apache.gluten.config.GlutenCoreConfig
import org.apache.gluten.memory.SimpleMemoryUsageRecorder
-import org.apache.gluten.task.TaskListener
+import org.apache.gluten.task.{TaskErrorLogger, TaskListener}
-import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException,
UnknownReason}
+import org.apache.spark.{TaskContext, TaskFailedReason, UnknownReason}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{SparkTaskUtil, TaskCompletionListener,
TaskFailureListener}
@@ -204,16 +204,14 @@ object TaskResources extends TaskListener with Logging {
}
val registry = new TaskResourceRegistry
RESOURCE_REGISTRIES.put(tc, registry)
+ // TODO: Propose upstream Spark changes for resilient error logging when
+ // CompletionListener crashes. Using TaskErrorLogger as workaround
tc.addTaskFailureListener(
// in case of crashing in task completion listener, errors may be
swallowed
new TaskFailureListener {
override def onTaskFailure(context: TaskContext, error: Throwable):
Unit = {
- // TODO:
- // The general duty of printing error message should not reside in
memory module
- error match {
- case e: TaskKilledException if e.reason == "another attempt
succeeded" =>
- case _ => logError(s"Task ${context.taskAttemptId()} failed by
error: ", error)
- }
+ // Delegate error logging to TaskErrorLogger utility
+ TaskErrorLogger.logTaskFailure(context, error)
}
})
tc.addTaskCompletionListener(new TaskCompletionListener {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]