This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 83a365edf16 [SPARK-38922][CORE] TaskLocation.apply throw
NullPointerException
83a365edf16 is described below
commit 83a365edf163bdd30974756c6c58fdca2e16f7f3
Author: Kent Yao <[email protected]>
AuthorDate: Wed Apr 20 14:38:26 2022 +0800
[SPARK-38922][CORE] TaskLocation.apply throw NullPointerException
### What changes were proposed in this pull request?
TaskLocation.apply w/o NULL check may throw NPE and fail job scheduling
```
Caused by: java.lang.NullPointerException
at
scala.collection.immutable.StringLike$class.stripPrefix(StringLike.scala:155)
at scala.collection.immutable.StringOps.stripPrefix(StringOps.scala:29)
at org.apache.spark.scheduler.TaskLocation$.apply(TaskLocation.scala:71)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal
```
For instance, `org.apache.spark.rdd.HadoopRDD#convertSplitLocationInfo`
might generate unexpected `Some(null)` elements where should be replace by
`Option.apply`
### Why are the changes needed?
fix NPE
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes #36222 from yaooqinn/SPARK-38922.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit 33e07f3cd926105c6d28986eb6218f237505549e)
Signed-off-by: Kent Yao <[email protected]>
---
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../org/apache/spark/rdd/HadoopRDDSuite.scala | 30 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fcc2275585e..0d905b46953 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -454,7 +454,7 @@ private[spark] object HadoopRDD extends Logging {
infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
val locationStr = loc.getLocation
- if (locationStr != "localhost") {
+ if (locationStr != null && locationStr != "localhost") {
if (loc.isInMemory) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ffaabba71e8..ea3a333b19e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2736,7 +2736,7 @@ private[spark] class DAGScheduler(
// If the RDD has some placement preferences (as is the case for input
RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
- return rddPrefs.map(TaskLocation(_))
+ return rddPrefs.filter(_ != null).map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the
first narrow dependency
diff --git a/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala
b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala
new file mode 100644
index 00000000000..b43d76c114c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.mapred.SplitLocationInfo
+
+import org.apache.spark.SparkFunSuite
+
+class HadoopRDDSuite extends SparkFunSuite {
+
+ test("SPARK-38922: HadoopRDD convertSplitLocationInfo contains Some(null)
cause NPE") {
+ val locs = Array(new SplitLocationInfo(null, false))
+ assert(HadoopRDD.convertSplitLocationInfo(locs).get.isEmpty)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]