[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258487#comment-16258487 ]
Sean Owen commented on SPARK-19476: ----------------------------------- Ok. These limitations are from your app though (no batching, high overhead per slot or something). This isn't a general problem to document. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > --------------------------------------------------------------------------------- > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Reporter: Gal Topper > Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org