[ https://issues.apache.org/jira/browse/SPARK-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
JESSE CHEN updated SPARK-14318: ------------------------------- Description: TPCDS Q14 parses successfully, and plans created successfully. Spark tries to run (I used only 1GB text file), but "hangs". Tasks are extremely slow to process AND all CPUs are used 100% by the executor JVMs. It is very easy to reproduce: 1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 1GB text file (assuming you know how to generate the csv data). My command is like this: {noformat} /TestAutomation/downloads/spark-master/bin/spark-sql --driver-memory 10g --verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 --executor-memory 8g --num-executors 4 --executor-cores 4 --conf spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > q14.out {noformat} The Spark console output: {noformat} 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes) 16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200) 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes) 16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200) 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes) 16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200) 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes) 16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 on executor id: 2 hostname: bigaperf137.svl.ibm.com. 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200) {noformat} Notice that time durations between tasks are unusually long: 2~5 minutes. When looking at the Linux 'perf' tool, two top CPU consumers are: 86.48% java [unknown] 12.41% libjvm.so Using the Java hotspot profiling tools, I am able to show what hotspot methods are (top 5): {noformat} org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() 46.845276 9,654,179 ms **(46.8%)** 9,654,179 ms 9,654,179 ms 9,654,179 ms org.apache.spark.unsafe.Platform.copyMemory() 18.631157 3,848,442 ms (18.6%) 3,848,442 ms 3,848,442 ms 3,848,442 ms org.apache.spark.util.collection.CompactBuffer.$plus$eq() 6.8570185 1,418,411 ms (6.9%) 1,418,411 ms 1,517,960 ms 1,517,960 ms org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue() 4.6126328 955,495 ms (4.6%) 955,495 ms 2,153,910 ms 2,153,910 ms org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write() 4.581077 949,930 ms (4.6%) 949,930 ms 19,967,510 ms 19,967,510 ms {noformat} So as you can see, the test has been running for 1.5 hours...with 46% CPU spent in the org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. The stacks for top two are: {noformat} Marshalling I java/io/DataOutputStream.writeInt() line 197 org.apache.spark.sql I org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() line 60 org.apache.spark.storage I org/apache/spark/storage/DiskBlockObjectWriter.write() line 185 org.apache.spark.shuffle I org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150 org.apache.spark.scheduler I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 I org/apache/spark/scheduler/Task.run() line 82 org.apache.spark.executor I org/apache/spark/executor/Executor$TaskRunner.run() line 231 Dispatching Overhead, Standard Library Worker Dispatching I java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 I java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 I java/lang/Thread.run() line 745 {noformat} and {noformat} org.apache.spark.unsafe I org/apache/spark/unsafe/Platform.copyMemory() line 172 org.apache.spark.sql I org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504 Class Loading I org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext() org.apache.spark.sql I org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41 I org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext() line 375 scala.collection I scala/collection/Iterator$$anon$11.hasNext() line 369 org.apache.spark.shuffle I org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147 org.apache.spark.scheduler I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 I org/apache/spark/scheduler/Task.run() line 82 org.apache.spark.executor I org/apache/spark/executor/Executor$TaskRunner.run() line 231 Dispatching Overhead, Standard Library Worker Dispatching I java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 I java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 I java/lang/Thread.run() line 745 {noformat} was: TPCDS Q14 parses successfully, and plans created successfully. Spark tries to run (I used only 1GB text file), but "hangs". Tasks are extremely slow to process AND all CPUs are used 100% by the executor JVMs. It is very easy to reproduce: 1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of 1GB text file (assuming you know how to generate the csv data). My command is like this: {noformat} /TestAutomation/downloads/spark-master/bin/spark-sql --driver-memory 10g --verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 --executor-memory 8g --num-executors 4 --executor-cores 4 --conf spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > q14.out {noformat} The Spark console output: {noformat} 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes) 16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage 17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200) 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes) 16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage 17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200) 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage 17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes) 16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 on executor id: 4 hostname: bigaperf138.svl.ibm.com. 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage 17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200) 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes) 16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 on executor id: 2 hostname: bigaperf137.svl.ibm.com. 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage 17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200) {noformat} Notice that time durations between tasks are unusually long: 2~5 minutes. When looking at the Linux 'perf' tool, two top CPU consumers are: 86.48% java [unknown] 12.41% libjvm.so Using the Java hotspot profiling tools, I am able to show what hotspot methods are (top 5): {noformat} org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() 46.845276 9,654,179 ms (46.8%) 9,654,179 ms 9,654,179 ms 9,654,179 ms org.apache.spark.unsafe.Platform.copyMemory() 18.631157 3,848,442 ms (18.6%) 3,848,442 ms 3,848,442 ms 3,848,442 ms org.apache.spark.util.collection.CompactBuffer.$plus$eq() 6.8570185 1,418,411 ms (6.9%) 1,418,411 ms 1,517,960 ms 1,517,960 ms org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue() 4.6126328 955,495 ms (4.6%) 955,495 ms 2,153,910 ms 2,153,910 ms org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write() 4.581077 949,930 ms (4.6%) 949,930 ms 19,967,510 ms 19,967,510 ms {noformat} So as you can see, the test has been running for 1.5 hours...with 46% CPU spent in the org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. The stacks for top two are: {noformat} Marshalling I java/io/DataOutputStream.writeInt() line 197 org.apache.spark.sql I org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() line 60 org.apache.spark.storage I org/apache/spark/storage/DiskBlockObjectWriter.write() line 185 org.apache.spark.shuffle I org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150 org.apache.spark.scheduler I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 I org/apache/spark/scheduler/Task.run() line 82 org.apache.spark.executor I org/apache/spark/executor/Executor$TaskRunner.run() line 231 Dispatching Overhead, Standard Library Worker Dispatching I java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 I java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 I java/lang/Thread.run() line 745 {noformat} and {noformat} org.apache.spark.unsafe I org/apache/spark/unsafe/Platform.copyMemory() line 172 org.apache.spark.sql I org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504 Class Loading I org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext() org.apache.spark.sql I org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41 I org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext() line 375 scala.collection I scala/collection/Iterator$$anon$11.hasNext() line 369 org.apache.spark.shuffle I org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147 org.apache.spark.scheduler I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 I org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 I org/apache/spark/scheduler/Task.run() line 82 org.apache.spark.executor I org/apache/spark/executor/Executor$TaskRunner.run() line 231 Dispatching Overhead, Standard Library Worker Dispatching I java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 I java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 I java/lang/Thread.run() line 745 {noformat} > TPCDS query 14 causes Spark SQL to hang > --------------------------------------- > > Key: SPARK-14318 > URL: https://issues.apache.org/jira/browse/SPARK-14318 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0, 2.0.0 > Reporter: JESSE CHEN > Labels: hangs > > TPCDS Q14 parses successfully, and plans created successfully. Spark tries to > run (I used only 1GB text file), but "hangs". Tasks are extremely slow to > process AND all CPUs are used 100% by the executor JVMs. > It is very easy to reproduce: > 1. Use the spark-sql CLI to run the query 14 (TPCDS) against a database of > 1GB text file (assuming you know how to generate the csv data). My command is > like this: > {noformat} > /TestAutomation/downloads/spark-master/bin/spark-sql --driver-memory 10g > --verbose --master yarn-client --packages com.databricks:spark-csv_2.10:1.3.0 > --executor-memory 8g --num-executors 4 --executor-cores 4 --conf > spark.sql.join.preferSortMergeJoin=true --database hadoopds1g -f $f > q14.out > {noformat} > The Spark console output: > {noformat} > 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Starting task 26.0 in stage > 17.0 (TID 65, bigaperf138.svl.ibm.com, partition 26,RACK_LOCAL, 4515 bytes) > 16/03/31 15:45:37 INFO cluster.YarnClientSchedulerBackend: Launching task 65 > on executor id: 4 hostname: bigaperf138.svl.ibm.com. > 16/03/31 15:45:37 INFO scheduler.TaskSetManager: Finished task 23.0 in stage > 17.0 (TID 62) in 829687 ms on bigaperf138.svl.ibm.com (15/200) > 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Starting task 27.0 in stage > 17.0 (TID 66, bigaperf138.svl.ibm.com, partition 27,RACK_LOCAL, 4515 bytes) > 16/03/31 15:45:52 INFO cluster.YarnClientSchedulerBackend: Launching task 66 > on executor id: 4 hostname: bigaperf138.svl.ibm.com. > 16/03/31 15:45:52 INFO scheduler.TaskSetManager: Finished task 26.0 in stage > 17.0 (TID 65) in 15505 ms on bigaperf138.svl.ibm.com (16/200) > 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Starting task 28.0 in stage > 17.0 (TID 67, bigaperf138.svl.ibm.com, partition 28,RACK_LOCAL, 4515 bytes) > 16/03/31 15:46:17 INFO cluster.YarnClientSchedulerBackend: Launching task 67 > on executor id: 4 hostname: bigaperf138.svl.ibm.com. > 16/03/31 15:46:17 INFO scheduler.TaskSetManager: Finished task 27.0 in stage > 17.0 (TID 66) in 24929 ms on bigaperf138.svl.ibm.com (17/200) > 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Starting task 29.0 in stage > 17.0 (TID 68, bigaperf137.svl.ibm.com, partition 29,NODE_LOCAL, 4515 bytes) > 16/03/31 15:51:53 INFO cluster.YarnClientSchedulerBackend: Launching task 68 > on executor id: 2 hostname: bigaperf137.svl.ibm.com. > 16/03/31 15:51:53 INFO scheduler.TaskSetManager: Finished task 10.0 in stage > 17.0 (TID 47) in 3775585 ms on bigaperf137.svl.ibm.com (18/200) > {noformat} > Notice that time durations between tasks are unusually long: 2~5 minutes. > When looking at the Linux 'perf' tool, two top CPU consumers are: > 86.48% java [unknown] > 12.41% libjvm.so > Using the Java hotspot profiling tools, I am able to show what hotspot > methods are (top 5): > {noformat} > org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() > 46.845276 9,654,179 ms **(46.8%)** 9,654,179 ms 9,654,179 ms > 9,654,179 ms > org.apache.spark.unsafe.Platform.copyMemory() 18.631157 3,848,442 ms > (18.6%) 3,848,442 ms 3,848,442 ms 3,848,442 ms > org.apache.spark.util.collection.CompactBuffer.$plus$eq() 6.8570185 > 1,418,411 ms (6.9%) 1,418,411 ms 1,517,960 ms 1,517,960 ms > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeValue() > 4.6126328 955,495 ms (4.6%) 955,495 ms 2,153,910 ms > 2,153,910 ms > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write() > 4.581077 949,930 ms (4.6%) 949,930 ms 19,967,510 ms > 19,967,510 ms > {noformat} > So as you can see, the test has been running for 1.5 hours...with 46% CPU > spent in the > org.apache.spark.storage.DiskBlockObjectWriter.updateBytesWritten() method. > The stacks for top two are: > {noformat} > Marshalling > I > java/io/DataOutputStream.writeInt() line 197 > org.apache.spark.sql > I > org/apache/spark/sql/execution/UnsafeRowSerializerInstance$$anon$2.writeValue() > line 60 > org.apache.spark.storage > I > org/apache/spark/storage/DiskBlockObjectWriter.write() line 185 > org.apache.spark.shuffle > I > org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 150 > org.apache.spark.scheduler > I > org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 > I > org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 > I > org/apache/spark/scheduler/Task.run() line 82 > org.apache.spark.executor > I > org/apache/spark/executor/Executor$TaskRunner.run() line 231 > Dispatching Overhead, Standard Library Worker Dispatching > I > java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 > I > java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 > I > java/lang/Thread.run() line 745 > {noformat} > and > {noformat} > org.apache.spark.unsafe > I > org/apache/spark/unsafe/Platform.copyMemory() line 172 > org.apache.spark.sql > I > org/apache/spark/sql/catalyst/expressions/UnsafeRow.copy() line 504 > Class Loading > I > org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIterator.processNext() > org.apache.spark.sql > I > org/apache/spark/sql/execution/BufferedRowIterator.hasNext() line 41 > I > org/apache/spark/sql/execution/WholeStageCodegen$$anonfun$doExecute$2$$anon$2.hasNext() > line 375 > scala.collection > I > scala/collection/Iterator$$anon$11.hasNext() line 369 > org.apache.spark.shuffle > I > org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.write() line 147 > org.apache.spark.scheduler > I > org/apache/spark/scheduler/ShuffleMapTask.runTask() line 78 > I > org/apache/spark/scheduler/ShuffleMapTask.runTask() line 46 > I > org/apache/spark/scheduler/Task.run() line 82 > org.apache.spark.executor > I > org/apache/spark/executor/Executor$TaskRunner.run() line 231 > Dispatching Overhead, Standard Library Worker Dispatching > I > java/util/concurrent/ThreadPoolExecutor.runWorker() line 1142 > I > java/util/concurrent/ThreadPoolExecutor$Worker.run() line 617 > I > java/lang/Thread.run() line 745 > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org