This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new a001482b43d2 [SPARK-46480][CORE][SQL][3.5] Fix NPE when table cache
task attempt
a001482b43d2 is described below
commit a001482b43d24b4761049687b87bceba0e21c8fd
Author: ulysses-you <[email protected]>
AuthorDate: Fri Dec 22 17:28:59 2023 +0800
[SPARK-46480][CORE][SQL][3.5] Fix NPE when table cache task attempt
This pr backports https://github.com/apache/spark/pull/44445 for branch-3.5
### What changes were proposed in this pull request?
This pr adds a check: we only mark the cached partition is materialized if
the task is not failed and not interrupted. And adds a new method `isFailed` in
`TaskContext`.
### Why are the changes needed?
Before this pr, when do cache, task failure can cause NPE in other tasks
```
java.lang.NullPointerException
at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```
### Does this PR introduce _any_ user-facing change?
yes, it's a bug fix
### How was this patch tested?
add test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44457 from ulysses-you/fix-cache-3.5.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: youxiduo <[email protected]>
---
core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 2 ++
core/src/main/scala/org/apache/spark/TaskContext.scala | 5 +++++
core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 2 ++
.../scala/org/apache/spark/scheduler/TaskContextSuite.scala | 10 ++++++++++
project/MimaExcludes.scala | 4 +++-
.../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 8 +++++---
6 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index ecc0c891ea16..94ba3fe64a85 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -193,6 +193,8 @@ class BarrierTaskContext private[spark] (
override def isCompleted(): Boolean = taskContext.isCompleted()
+ override def isFailed(): Boolean = taskContext.isFailed()
+
override def isInterrupted(): Boolean = taskContext.isInterrupted()
override def addTaskCompletionListener(listener: TaskCompletionListener):
this.type = {
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 450c00928c9e..af7aa4979dc1 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
*/
def isCompleted(): Boolean
+ /**
+ * Returns true if the task has failed.
+ */
+ def isFailed(): Boolean
+
/**
* Returns true if the task has been killed.
*/
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 526627c28607..46273a1b6d68 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
@GuardedBy("this")
override def isCompleted(): Boolean = synchronized(completed)
+ override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)
+
override def isInterrupted(): Boolean = reasonIfKilled.isDefined
override def getLocalProperty(key: String): String =
localProperties.getProperty(key)
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 54a42c1a6618..a5c2cbf52aaf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -669,6 +669,16 @@ class TaskContextSuite extends SparkFunSuite with
BeforeAndAfter with LocalSpark
assert(invocationOrder === Seq("C", "B", "A", "D"))
}
+ test("SPARK-46480: Add isFailed in TaskContext") {
+ val context = TaskContext.empty()
+ var isFailed = false
+ context.addTaskCompletionListener[Unit] { context =>
+ isFailed = context.isFailed()
+ }
+ context.markTaskFailed(new RuntimeException())
+ context.markTaskCompleted(None)
+ assert(isFailed)
+ }
}
private object TaskContextSuite {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9805ad7f09d6..376ddfde1b93 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -72,7 +72,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.api.java.function.MapGroupsWithStateFunction"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SaveMode"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState")
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState"),
+ // [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed")
)
// Default exclude rules
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 45d006b58e87..65f7835b42cf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -279,9 +279,11 @@ case class CachedRDDBuilder(
cachedPlan.conf)
}
val cached = cb.mapPartitionsInternal { it =>
- TaskContext.get().addTaskCompletionListener[Unit](_ => {
- materializedPartitions.add(1L)
- })
+ TaskContext.get().addTaskCompletionListener[Unit] { context =>
+ if (!context.isFailed() && !context.isInterrupted()) {
+ materializedPartitions.add(1L)
+ }
+ }
new Iterator[CachedBatch] {
override def hasNext: Boolean = it.hasNext
override def next(): CachedBatch = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]