[
https://issues.apache.org/jira/browse/SPARK-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Noam Barkai updated SPARK-7564:
-------------------------------
Description:
query over a table that's fully cached in memory, coming from columnar storage
creates surprisingly slow performance. The query is a simple SELECT over a 10Gb
table that sits comfortably in memory (Storage tab in Spark UI affirms this).
All operations are over memory, no shuffle is taking place (again, seen via
Spark UI).
Looking at profiling it seems almost all worker threads are in one of two
states:
1) either trying to acquire an instance of a Kryo serializer from the pool in
SparkSqlSerializer, like so:
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
2) or trying to release one:
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
Issue appears when caching is done for data coming from columnar storage - I
was able to reproduce this using both ORC and Parquet.
When data is loaded from a parallel tsv text file issue does not occur.
It seems to be related to de/serialization calls done via
InMemoryColumnarTableScan.
The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY
col1").collect()
hiveContext.sql("select col1, col2, col3 from cached_tbl").collect
It seems that possibly the usage of KryoResourcePool in SparkSqlSerializer
causes contention in the underlying ArrayBlockingQueue. A possible fix might be
to replace this data structure with something more "multi-thread friendly"
was:
Running a query over a table that's completely cached in memory I came across
surprisingly slow performance. The query is a simple SELECT over a 10Gb table
that sits comfortably in memory (Storage tab in Spark UI affirms this). It is
held in 60 partitions, to improve parallelism. All operations are over memory,
no shuffle is taking place (again, seen via Spark UI).
Looking at some periodic thread-dumps of the workers I see almost all worker
threads - 20 in each worker, as the table has 20 partitions in each node - were
in one of two states, related to serialization:
1) either trying to acquire an instance of a Kryo serializer from the pool in
SparkSqlSerializer, like so:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
2) or trying to release one:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
I speculate that when contention is high on the underlying ArrayBlockingQueue
holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I
would speculate it shouldn't.
The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY
col1").collect() // the ORDER BY here is added to induce a shuffle so the table
will be cached in more partitions than it is found in the underlying HDFS
storage
hiveContext.sql("select col1, col2, col3 from
cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick,
it's the select that's taking long
> performance bottleneck in SparkSQL using columnar storage
> ---------------------------------------------------------
>
> Key: SPARK-7564
> URL: https://issues.apache.org/jira/browse/SPARK-7564
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.2.1, 1.3.1
> Environment: 3 node cluster, each with 100g RAM and 40 cores
> Reporter: Noam Barkai
> Attachments: worker profiling showing the bottle-neck.png
>
>
> query over a table that's fully cached in memory, coming from columnar
> storage creates surprisingly slow performance. The query is a simple SELECT
> over a 10Gb table that sits comfortably in memory (Storage tab in Spark UI
> affirms this). All operations are over memory, no shuffle is taking place
> (again, seen via Spark UI).
> Looking at profiling it seems almost all worker threads are in one of two
> states:
> 1) either trying to acquire an instance of a Kryo serializer from the pool in
> SparkSqlSerializer, like so:
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
> com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> 2) or trying to release one:
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
> com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> Issue appears when caching is done for data coming from columnar storage - I
> was able to reproduce this using both ORC and Parquet.
> When data is loaded from a parallel tsv text file issue does not occur.
> It seems to be related to de/serialization calls done via
> InMemoryColumnarTableScan.
> The code I'm using (running from Spark shell):
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY
> col1").collect()
> hiveContext.sql("select col1, col2, col3 from cached_tbl").collect
> It seems that possibly the usage of KryoResourcePool in SparkSqlSerializer
> causes contention in the underlying ArrayBlockingQueue. A possible fix might
> be to replace this data structure with something more "multi-thread friendly"
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]