[ 
https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696338#comment-14696338
 ] 

Cheng Hao commented on SPARK-9879:
----------------------------------

I create a new physical operator called LargeLimit, and will take effect when 
the limit is equal or greater than the SqlConf.LIMIT_ROWS in LIMIT clause.

LargeLimit will trigger the children RDD execution and persist its result, and 
we need to iterate the persisted data twice. The first iteration to get the 
number of records in each partition, then we can compute how many records we 
need to take from each of the partition, to satisfy the total number of records 
we need; in the second iteration, we just take the records from each of 
partition, according to the specified numbers.

The main advantage of this approach:
- No single node shuffle required, even no data shuffle required, and the 
result data is still in distributed mode.
- Keep the same output partitioning as its child.

> OOM in LIMIT clause with large number
> -------------------------------------
>
>                 Key: SPARK-9879
>                 URL: https://issues.apache.org/jira/browse/SPARK-9879
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Cheng Hao
>
> {code}
> create table spark.tablsetest as select * from dpa_ord_bill_tf order by 
> member_id limit 20000000;
> {code}
>          
> {code}
> spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options 
> -XX:PermSize=1024M -XX:MaxPermSize=2048M
> Error logs
> 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread 
> [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem 
> [sparkDriver]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
> at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.close(Output.java:165)
> at 
> org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162)
> at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139)
> at 
> org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at 
> org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
> at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
> 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread 
> [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem 
> [sparkDriver]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
> at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134)
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
> at com.esotericsoftware.kryo.io.Output.close(Output.java:165)
> at 
> org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162)
> at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139)
> at 
> org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
> at 
> org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
> at 
> org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Shutting 
> down remote daemon.
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Remote 
> daemon shut down; proceeding with flushing remote transports.
> 15/07/27 10:22:46 WARN AkkaRpcEndpointRef: Error sending message [message = 
> RemoveBroadcast(2,true)] in 1 attempts
> akka.pattern.AskTimeoutException: 
> Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had 
> already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at 
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at 
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:46 INFO RemoteActorRefProvider$RemotingTerminator: Remoting 
> shut down.
> 15/07/27 10:22:49 WARN AkkaRpcEndpointRef: Error sending message [message = 
> RemoveBroadcast(2,true)] in 2 attempts
> akka.pattern.AskTimeoutException: 
> Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had 
> already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at 
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at 
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:52 WARN AkkaRpcEndpointRef: Error sending message [message = 
> RemoveBroadcast(2,true)] in 3 attempts
> akka.pattern.AskTimeoutException: 
> Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had 
> already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at 
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at 
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> 15/07/27 10:22:55 ERROR ContextCleaner: Error cleaning broadcast 2
> org.apache.spark.SparkException: Error sending message [message = 
> RemoveBroadcast(2,true)]
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:127)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at 
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
> at 
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> Caused by: akka.pattern.AskTimeoutException: 
> Recipient[Actorakka://sparkDriver/user/BlockManagerMaster#2011779764] had 
> already been terminated.
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:299)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to