[
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen updated SPARK-19476:
------------------------------
Priority: Minor (was: Major)
Issue Type: Improvement (was: Bug)
You certainly access the iterator from Spark right? that's what I mean. You
might just copy that off into a data structure with no relation to the iterator
implementation. That of course may not be feasible.
I think this is true over the whole API, that you're not intended to create
your own threads. It could well work in many cases but don't think it's
guaranteed to. Sure, maybe worth a note.
> Running threads in Spark DataFrame foreachPartition() causes
> NullPointerException
> ---------------------------------------------------------------------------------
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Reporter: Gal Topper
> Priority: Minor
>
> First reported on [Stack
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me
> except for when the underlying iterator is TungstenAggregationIterator. Here
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
> val sc = new SparkContext("local", "reproduce")
> val sqlContext = new SQLContext(sc)
> import sqlContext.implicits._
> val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
> df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
> }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens -
> TungstenAggregationIterator uses a ThreadLocal variable that returns null
> when called from a thread other than the original thread that got the
> iterator from Spark. From examining the code, this does not appear to differ
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not
> documented, as far as I'm aware.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]