Noam Barcay created SPARK-7564:
----------------------------------
Summary: possible performance bottleneck in SparkSQL's
SparkSqlSerializer class
Key: SPARK-7564
URL: https://issues.apache.org/jira/browse/SPARK-7564
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.2.1
Environment: 3 node cluster, each with 100g RAM and 40 cores
Reporter: Noam Barcay
Running a query over a table that's completely cached in memory I came across
surprisingly slow performance. The query is a simple SELECT over a 10Gb table
that sits comfortably in memory (Storage tab in Spark UI affirms this). It is
held in 60 partitions, to improve parallelism. All operations are over memory,
no shuffle is taking place (again, seen via Spark UI).
Looking at some periodic thread-dumps of the workers I see almost all worker
threads - 20 in each worker, as the table has 20 partitions in each node - were
in one of two states, related to serialization - either trying to acquire an
instance of a Kryo serializer from the pool in SparkSqlSerializer, like so:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
or trying to release one:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
I speculate that when contention is high on the underlying ArrayBlockingQueue
holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I
would speculate it shouldn't.
The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY
col1").collect() // the ORDER BY here is added to induce a shuffle so the table
will be cached in more partitions than it is found in the underlying HDFS
storage
hiveContext.sql("select col1, col2, col3 from
cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick,
it's the select that's taking long
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]