For the hottest key, it will need about 1-2 GB memory for Python
worker to do groupByKey().

These configurations can not help with the memory of Python worker.

So, two options:

1) use reduceByKey() or combineByKey() to reduce the memory
consumption in Python worker.
2) try master or 1.1 branch with the feature of spilling in Python.

Davies

On Wed, Aug 13, 2014 at 4:08 PM, Arpan Ghosh <ar...@automatic.com> wrote:
> Here are the biggest keys:
>
> [   (17634, 87874097),
>
>     (8407, 38395833),
>
>     (20092, 14403311),
>
>     (9295, 4142636),
>
>     (14359, 3129206),
>
>     (13051, 2608708),
>
>     (14133, 2073118),
>
>     (4571, 2053514),
>
>     (16175, 2021669),
>
>     (5268, 1908557),
>
>     (3669, 1687313),
>
>     (14051, 1628416),
>
>     (19660, 1619860),
>
>     (10206, 1546037),
>
>     (3740, 1527272),
>
>     (426, 1522788),
>
>
> Should I try to increase spark.shuffle.memoryFraction and decrease
> spark.storage.memoryFraction ?
>
>
>
> On Wed, Aug 13, 2014 at 1:39 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> Arpan,
>>
>> Which version of Spark are you using? Could you try the master or 1.1
>> branch? which can spill the data into disk during groupByKey().
>>
>> PS: it's better to use reduceByKey() or combineByKey() to reduce data
>> size during shuffle.
>>
>> Maybe there is a huge key in the data sets, you can find it in this way:
>>
>> rdd.countByKey().sortBy(lambda x:x[1], False).take(10)
>>
>> Davies
>>
>>
>> On Wed, Aug 13, 2014 at 12:21 PM, Arpan Ghosh <ar...@automatic.com> wrote:
>> > Hi,
>> >
>> > Let me begin by describing my Spark setup on EC2 (launched using the
>> > provided spark-ec2.py script):
>> >
>> > 100 c3.2xlarge workers (8 cores & 15GB memory each)
>> > 1 c3.2xlarge Master (only running master daemon)
>> > Spark 1.0.2
>> > 8GB mounted at / & 80 GB mounted at /mnt
>> >
>> > spark-defaults.conf (A lot of config options have been added here to try
>> > and
>> > fix the problem. I also encounter the problem while running with the
>> > default
>> > options)
>> >
>> > spark.executor.memory   12991m
>> > spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
>> > spark.executor.extraClassPath   /root/ephemeral-hdfs/conf
>> > spark.shuffle.file.buffer.kb    1024
>> > spark.reducer.maxMbInFlight     96
>> > spark.serializer.objectStreamReset      100000
>> > spark.akka.frameSize    100
>> > spark.akka.threads      32
>> > spark.akka.timeout      1000
>> > spark.serializer        org.apache.spark.serializer.KryoSerializer
>> >
>> > spark-env.sh (A lot of config options have been added here to try and
>> > fix
>> > the problem. I also encounter the problem while running with the default
>> > options)
>> >
>> > export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
>> > export SPARK_MASTER_OPTS="-Dspark.worker.timeout=900"
>> > export SPARK_WORKER_INSTANCES=1
>> > export SPARK_WORKER_CORES=8
>> > export HADOOP_HOME="/root/ephemeral-hdfs"
>> > export SPARK_MASTER_IP=<Master's Public DNS, as added by spark-ec2.py
>> > script>
>> > export MASTER=`cat /root/spark-ec2/cluster-url`
>> > export
>> >
>> > SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/lib/native/"
>> > export
>> >
>> > SPARK_SUBMIT_CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:/root/ephemeral-hdfs/conf"
>> > export SPARK_PUBLIC_DNS=<wget command to get the public hostname, as
>> > added
>> > by spark-ec2.py script>
>> >
>> > # Set a high ulimit for large shuffles
>> >
>> > ulimit -n 10000000
>> >
>> >
>> > I am trying to run a very simple Job which reads in CSV data (~ 124 GB)
>> > from
>> > a S3 bucket, tries to group it based on a key and counts the number of
>> > groups. The number of partitions for the input textFile() is set to 1600
>> > and
>> > the number of partitions for the groupByKey() operation is also 1600
>> >
>> > conf = SparkConf().setAppName(JOB_NAME).setMaster(master)
>> > sc = SparkContext(conf=sconf)
>> >
>> > drive = sc.textFile(raw_drive_record_path, raw_drive_data_partitions)
>> >
>> >
>> > drive_grouped_by_user_vin_and_week =
>> > drive.flatMap(parse_raw_drive_record_and_key_by_user_vin_week)\
>> >
>> >         .groupByKey(numPartitions=user_vin_week_group_partitions)\
>> >
>> >         .count()
>> >
>> >
>> > Stage 1 (flatMap()) launches 1601 tasks all of which complete in 159
>> > seconds. Then Stage 0 (groupByKey()) is launched with 1600 tasks out of
>> > which 1595 complete in under a minute. The same 5 TIDs consistently fail
>> > with the following errors in the logs of their respective Executors:
>> >
>> > 14/08/13 02:45:15 ERROR executor.Executor: Exception in task ID 2203
>> >
>> > org.apache.spark.SparkException: Python worker exited unexpectedly
>> > (crashed)
>> >
>> > at
>> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:141)
>> >
>> > at
>> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
>> >
>> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
>> >
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >
>> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >
>> > at org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> >
>> > at
>> >
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >
>> > at
>> >
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > Caused by: java.io.EOFException
>> >
>> > at java.io.DataInputStream.readInt(DataInputStream.java:392)
>> >
>> > at
>> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)
>> >
>> > ... 10 more
>> >
>> > 14/08/13 02:45:30 ERROR python.PythonRDD: Python worker exited
>> > unexpectedly
>> > (crashed)
>> >
>> > java.net.SocketException: Connection reset
>> >
>> > at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> >
>> > at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> >
>> > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
>> >
>> > at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
>> >
>> > at java.io.DataInputStream.readInt(DataInputStream.java:387)
>> >
>> > at
>> > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:92)
>> >
>> > at
>> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
>> >
>> > at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
>> >
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >
>> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >
>> > at org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> >
>> > at
>> >
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >
>> > at
>> >
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > 14/08/13 02:45:30 ERROR python.PythonRDD: This may have been caused by a
>> > prior exception:
>> >
>> > java.net.SocketException: Broken pipe
>> >
>> > at java.net.SocketOutputStream.socketWrite0(Native Method)
>> >
>> > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
>> >
>> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
>> >
>> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> >
>> > at java.io.DataOutputStream.write(DataOutputStream.java:107)
>> >
>> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>> >
>> > 14/08/13 02:45:30 ERROR executor.Executor: Exception in task ID 2840
>> >
>> > java.net.SocketException: Broken pipe
>> >
>> > at java.net.SocketOutputStream.socketWrite0(Native Method)
>> >
>> > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
>> >
>> > at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
>> >
>> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> >
>> > at java.io.DataOutputStream.write(DataOutputStream.java:107)
>> >
>> > at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >
>> > at
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>> >
>> >
>> > The final error reported to the driver program is:
>> >
>> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
>> >
>> > 14/08/13 19:03:43 INFO scheduler.TaskSchedulerImpl: Stage 0 was
>> > cancelled
>> >
>> > 14/08/13 19:03:43 INFO scheduler.DAGScheduler: Failed to run count at
>> > /root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py:122
>> >
>> > Traceback (most recent call last):
>> >
>> >   File
>> > "/root/data_infrastructure/src/GroupRawDriveDataByUserVinWeek.py",
>> > line 122, in <module>
>> >
>> >     .groupByKey(numPartitions=user_vin_week_group_partitions)\
>> >
>> >   File "/root/spark/python/pyspark/rdd.py", line 737, in count
>> >
>> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> >
>> >   File "/root/spark/python/pyspark/rdd.py", line 728, in sum
>> >
>> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >
>> >   File "/root/spark/python/pyspark/rdd.py", line 648, in reduce
>> >
>> >     vals = self.mapPartitions(func).collect()
>> >
>> >   File "/root/spark/python/pyspark/rdd.py", line 612, in collect
>> >
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >
>> >   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >
>> >   File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line
>> > 300, in get_return_value
>> >
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o45.collect.
>> >
>> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> > Task
>> > 0.0:602 failed 4 times, most recent failure: Exception failure in TID
>> > 3212
>> > on host ip-10-146-221-202.ec2.internal: java.net.SocketException: Broken
>> > pipe
>> >
>> >         java.net.SocketOutputStream.socketWrite0(Native Method)
>> >
>> >
>> > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
>> >
>> >         java.net.SocketOutputStream.write(SocketOutputStream.java:159)
>> >
>> >
>> > java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>> >
>> >         java.io.DataOutputStream.write(DataOutputStream.java:107)
>> >
>> >         java.io.FilterOutputStream.write(FilterOutputStream.java:97)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:300)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:298)
>> >
>> >         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >
>> >         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:298)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> >
>> >
>> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> >
>> >
>> >
>> > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>> >
>> > Driver stacktrace:
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>> >
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>> >
>> > at scala.Option.foreach(Option.scala:236)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
>> >
>> > at
>> >
>> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>> >
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> >
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> >
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> >
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> >
>> > at
>> >
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> >
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > I also noticed some AssociationError's in the log of each Worker (in
>> > /root/spark/logs):
>> >
>> > 14/08/13 19:03:44 ERROR remote.EndpointWriter: AssociationError
>> > [akka.tcp://sparkWorker@ip-10-142-182-124.ec2.internal:57142] ->
>> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]: Error
>> > [Association failed with
>> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]] [
>> >
>> > akka.remote.EndpointAssociationException: Association failed with
>> > [akka.tcp://sparkExecutor@ip-10-142-182-124.ec2.internal:51159]
>> >
>> > Caused by:
>> > akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>> > Connection refused: ip-10-142-182-124.ec2.internal/10.142.182.124:51159]
>> >
>> >
>> > It looks like the error is occurring during the shuffle when the reduce
>> > tasks are trying to fetch their corresponding map outputs and the
>> > connection
>> > over which they are fetching this data is getting reset or prematurely
>> > terminated. This Job runs fine when I run it on the same setup with a
>> > smaller dataset (~ 62 GB). I am unable to debug this further. Any help
>> > would
>> > be appreciated.
>> >
>> > Thanks
>> >
>> > Arpan
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to