[
https://issues.apache.org/jira/browse/SPARK-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Noam Barkai updated SPARK-7564:
-------------------------------
Affects Version/s: 1.3.1
> possible performance bottleneck in SparkSQL's SparkSqlSerializer class
> ----------------------------------------------------------------------
>
> Key: SPARK-7564
> URL: https://issues.apache.org/jira/browse/SPARK-7564
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.2.1, 1.3.1
> Environment: 3 node cluster, each with 100g RAM and 40 cores
> Reporter: Noam Barkai
> Attachments: worker profiling showing the bottle-neck.png
>
>
> Running a query over a table that's completely cached in memory I came across
> surprisingly slow performance. The query is a simple SELECT over a 10Gb table
> that sits comfortably in memory (Storage tab in Spark UI affirms this). It is
> held in 60 partitions, to improve parallelism. All operations are over
> memory, no shuffle is taking place (again, seen via Spark UI).
> Looking at some periodic thread-dumps of the workers I see almost all worker
> threads - 20 in each worker, as the table has 20 partitions in each node -
> were in one of two states, related to serialization:
> 1) either trying to acquire an instance of a Kryo serializer from the pool in
> SparkSqlSerializer, like so:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> ...
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
> com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> 2) or trying to release one:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> ...
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
> com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> I speculate that when contention is high on the underlying ArrayBlockingQueue
> holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which
> I would speculate it shouldn't.
> The code I'm using (running from Spark shell):
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
> hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY
> col1").collect() // the ORDER BY here is added to induce a shuffle so the
> table will be cached in more partitions than it is found in the underlying
> HDFS storage
> hiveContext.sql("select col1, col2, col3 from
> cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick,
> it's the select that's taking long
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]