[ 
https://issues.apache.org/jira/browse/SPARK-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Noam Barkai updated SPARK-7564:
-------------------------------
    Description: 
query over a table that's fully cached in memory, coming from columnar storage 
creates surprisingly slow performance. The query is a simple SELECT over a 10Gb 
table that sits comfortably in memory (Storage tab in Spark UI affirms this). 
All operations are over memory, no shuffle is taking place (again, seen via 
Spark UI).
Looking at profiling it seems almost all worker threads are in one of two 
states:

1) either trying to acquire an instance of a Kryo serializer from the pool in 
SparkSqlSerializer, like so:
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)



2) or trying to release one:
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)

Issue appears when caching is done for data coming from columnar storage - I 
was able to reproduce this using both ORC and Parquet.
When data is loaded from a parallel tsv text file issue does not occur.
It seems to be related to de/serialization calls done via 
InMemoryColumnarTableScan. 

The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY 
col1").collect()
hiveContext.sql("select col1, col2, col3 from cached_tbl").collect

It seems that possibly the usage of KryoResourcePool in SparkSqlSerializer 
causes contention in the underlying ArrayBlockingQueue. A possible fix might be 
to replace this data structure with something more "multi-thread friendly"

  was:
Running a query over a table that's completely cached in memory I came across 
surprisingly slow performance. The query is a simple SELECT over a 10Gb table 
that sits comfortably in memory (Storage tab in Spark UI affirms this). It is 
held in 60 partitions, to improve parallelism. All operations are over memory, 
no shuffle is taking place (again, seen via Spark UI).
Looking at some periodic thread-dumps of the workers I see almost all worker 
threads - 20 in each worker, as the table has 20 partitions in each node - were 
in one of two states, related to serialization:

1) either trying to acquire an instance of a Kryo serializer from the pool in 
SparkSqlSerializer, like so:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)



2) or trying to release one:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)



I speculate that when contention is high on the underlying ArrayBlockingQueue 
holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I 
would speculate it shouldn't.

The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY 
col1").collect() // the ORDER BY here is added to induce a shuffle so the table 
will be cached in more partitions than it is found in the underlying HDFS 
storage
hiveContext.sql("select col1, col2, col3 from 
cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick, 
it's the select that's taking long




> performance bottleneck in SparkSQL using columnar storage
> ---------------------------------------------------------
>
>                 Key: SPARK-7564
>                 URL: https://issues.apache.org/jira/browse/SPARK-7564
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.1, 1.3.1
>         Environment: 3 node cluster, each with 100g RAM and 40 cores
>            Reporter: Noam Barkai
>         Attachments: worker profiling showing the bottle-neck.png
>
>
> query over a table that's fully cached in memory, coming from columnar 
> storage creates surprisingly slow performance. The query is a simple SELECT 
> over a 10Gb table that sits comfortably in memory (Storage tab in Spark UI 
> affirms this). All operations are over memory, no shuffle is taking place 
> (again, seen via Spark UI).
> Looking at profiling it seems almost all worker threads are in one of two 
> states:
> 1) either trying to acquire an instance of a Kryo serializer from the pool in 
> SparkSqlSerializer, like so:
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
> com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> 2) or trying to release one:
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
> com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> Issue appears when caching is done for data coming from columnar storage - I 
> was able to reproduce this using both ORC and Parquet.
> When data is loaded from a parallel tsv text file issue does not occur.
> It seems to be related to de/serialization calls done via 
> InMemoryColumnarTableScan. 
> The code I'm using (running from Spark shell):
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY 
> col1").collect()
> hiveContext.sql("select col1, col2, col3 from cached_tbl").collect
> It seems that possibly the usage of KryoResourcePool in SparkSqlSerializer 
> causes contention in the underlying ArrayBlockingQueue. A possible fix might 
> be to replace this data structure with something more "multi-thread friendly"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to