xukun created SPARK-14065: ----------------------------- Summary: serialize MapStatuses in serial model Key: SPARK-14065 URL: https://issues.apache.org/jira/browse/SPARK-14065 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: xukun
the query like this: ``` use tpcds_parquet_30720; drop table temp; create table temp as select ws_order_number, sum1, sum2, sum1 * sum2 from (select ws_order_number, count(1) as sum1 from web_sales group by ws_order_number) x join (select wr_order_number, count(1) as sum2 from web_returns group by wr_order_number) y on x.ws_order_number = y.wr_order_number; ``` Executor log shows GetMapOutputStatuses timeout: ``` 2016-03-22 11:48:48,388 | WARN | [Executor task launch worker-1] | Error sending message [message = GetMapOutputStatuses(1)] in 1 attempts | org.apache.spark.Logging$class.logWarning(Logging.scala:92) org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.network.timeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:98) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:156) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:301) at org.apache.spark.rdd.RDD.iterator(RDD.scala:265) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:68) at org.apache.spark.scheduler.Task.run(Task.scala:90) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ... 32 more 2016-03-22 11:48:52,044 | INFO | [Executor task launch worker-1] | Got the output locations | org.apache.spark.Logging$class.logInfo(Logging.scala:59) ``` Driver log shows serialize mapStatus is serial ``` Line 192882: 16/03/22 11:46:54 INFO [dispatcher-event-loop-25] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 192914: 16/03/22 11:47:01 INFO [dispatcher-event-loop-7] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 192917: 16/03/22 11:47:07 INFO [dispatcher-event-loop-36] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 192957: 16/03/22 11:47:14 INFO [dispatcher-event-loop-30] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193139: 16/03/22 11:47:21 INFO [dispatcher-event-loop-3] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193262: 16/03/22 11:47:27 INFO [dispatcher-event-loop-32] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193440: 16/03/22 11:47:34 INFO [dispatcher-event-loop-31] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193544: 16/03/22 11:47:41 INFO [dispatcher-event-loop-38] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193700: 16/03/22 11:47:47 INFO [dispatcher-event-loop-4] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 193873: 16/03/22 11:47:54 INFO [dispatcher-event-loop-37] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194013: 16/03/22 11:48:00 INFO [dispatcher-event-loop-28] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194150: 16/03/22 11:48:07 INFO [dispatcher-event-loop-14] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194327: 16/03/22 11:48:14 INFO [dispatcher-event-loop-19] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194456: 16/03/22 11:48:20 INFO [dispatcher-event-loop-21] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194549: 16/03/22 11:48:27 INFO [dispatcher-event-loop-0] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194589: 16/03/22 11:48:34 INFO [dispatcher-event-loop-12] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194595: 16/03/22 11:48:40 INFO [dispatcher-event-loop-1] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194601: 16/03/22 11:48:47 INFO [dispatcher-event-loop-18] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194629: 16/03/22 11:48:53 INFO [dispatcher-event-loop-13] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194646: 16/03/22 11:48:59 INFO [dispatcher-event-loop-2] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194721: 16/03/22 11:49:06 INFO [dispatcher-event-loop-27] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 194749: 16/03/22 11:49:13 INFO [dispatcher-event-loop-23] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 195608: 16/03/22 11:49:19 INFO [dispatcher-event-loop-26] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 195776: spark-sql> 16/03/22 11:49:26 INFO [dispatcher-event-loop-24] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 195777: 16/03/22 11:49:32 INFO [dispatcher-event-loop-29] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes Line 195778: 16/03/22 11:49:39 INFO [dispatcher-event-loop-33] MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 10298063 bytes ``` I put serializeMapStatuses function into epochLock.synchronized { }, it solve this problem. But it has other question: serialize Mapstatus from different shuffle stage will be serial model. Any advice? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org