Gal Topper created SPARK-19476:
----------------------------------
Summary: Running threads in Spark DataFrame foreachPartition()
causes NullPointerException
Key: SPARK-19476
URL: https://issues.apache.org/jira/browse/SPARK-19476
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3, 1.6.2, 1.6.1, 1.6.0
Reporter: Gal Topper
First reported on [Stack
overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
I use multiple threads inside foreachPartition(), which works great for me
except for when the underlying iterator is TungstenAggregationIterator. Here is
a minimal code snippet to reproduce:
{code:title=Reproduce.scala|borderStyle=solid}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object Reproduce extends App {
val sc = new SparkContext("local", "reproduce")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
df.foreachPartition { iterator =>
val f = Future(iterator.toVector)
Await.result(f, Duration.Inf)
}
}
{code}
When I run this, I get:
{noformat}
java.lang.NullPointerException
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{noformat}
I believe I actually understand why this happens - TungstenAggregationIterator
uses a ThreadLocal variable that returns null when called from a thread other
than the original thread that got the iterator from Spark. From examining the
code, this does not appear to differ between recent Spark versions.
However, this limitation is specific to TungstenAggregationIterator, and not
documented, as far as I'm aware.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]