[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942326#comment-15942326
 ] 

Gal Topper edited comment on SPARK-19476 at 3/26/17 6:08 PM:
-------------------------------------------------------------

In our case, we use Akka Streams to make many parallel, async I/O calls. To 
speed this up, we allow a large number of in-flight requests. It would not be 
feasible to have as many executors as we do in-flight requests, nor should the 
two be coupled IMO.

One solution that came to my mind is to simply assign 
TaskContext.get().taskMetrics() to a private val (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L107])
 and use that later on when the iterator is exhausted (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L419]).
 Generally speaking, iterators are not expected to be thread-safe, but also not 
to be tethered to any one specific thread, and I think the suggested change 
would comply with this standard contract.

I could make a small PR if the idea makes sense.

EDIT: Nevermind, it's not that simple, because there is at least 
[one|https://github.com/apache/spark/blob/dd9049e0492cc70b629518fee9b3d1632374c612/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L313]
 more thread locality assumption that break when I try this.


was (Author: gal.topper):
In our case, we use Akka Streams to make many parallel, async I/O calls. To 
speed this up, we allow a large number of in-flight requests. It would not be 
feasible to have as many executors as we do in-flight requests, nor should the 
two be coupled IMO.

One solution that came to my mind is to simply assign 
TaskContext.get().taskMetrics() to a private val (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L107])
 and use that later on when the iterator is exhausted (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L419]).
 Generally speaking, iterators are not expected to be thread-safe, but also not 
to be tethered to any one specific thread, and I think the suggested change 
would comply with this standard contract.

I could make a small PR if the idea makes sense.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-19476
>                 URL: https://issues.apache.org/jira/browse/SPARK-19476
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
>     import scala.concurrent.ExecutionContext.Implicits.global
>     import scala.concurrent.duration.Duration
>     import scala.concurrent.{Await, Future}
>     import org.apache.spark.SparkContext
>     import org.apache.spark.sql.SQLContext
>     object Reproduce extends App {
>       val sc = new SparkContext("local", "reproduce")
>       val sqlContext = new SQLContext(sc)
>       import sqlContext.implicits._
>       val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>       df.foreachPartition { iterator =>
>         val f = Future(iterator.toVector)
>         Await.result(f, Duration.Inf)
>       }
>     }
> {code}
> When I run this, I get:
> {noformat}
>     java.lang.NullPointerException
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
>         at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to