Thanks Yanbo,

I was running with 1G per executor; my file is 7.5 G, running with the
standard block size of 128M, resulting in 7500/128M= 59 partitions
naturally. My boxes have 8CPUs, so I figured they could be processing 8
tasks/partitions at a time, needing

8*(partition_size) memory per executor, so 8*128M = 1G

Is this the right way to do this math?

I'm confused about _decreasing_ the number of partitions -- I thought from
a spark perspective, 7.5G / 10 partitions would result in 750M per
partition. So a Spark executor with 8 cores would potentially need
750*8=6000M of memory

Maybe my confusion comes from terminology -- I thought in Spark the
"default" number of partitions is always the number of input splits. From
your example (number of partitions) * (Parquet block size) = Minimum
Required Memory,
yet this would also be the Parquet overall file size from my
understanding  (number
of partitions) = FileSize/(Parquet block size)

It cannot be that Minimum Required Memory= Parquet file size


On Sat, Sep 5, 2015 at 11:00 PM, Yanbo Liang <yblia...@gmail.com> wrote:

> The Parquet output writer allocates one block for each table partition it
> is processing and writes partitions in parallel. It will run out of
> memory if (number of partitions) times (Parquet block size) is greater than
> the available memory. You can try to decrease the number of partitions. And
> could you share the value of "parquet.block.size" and your available memory?
>
> 2015-09-05 18:59 GMT+08:00 Yana Kadiyska <yana.kadiy...@gmail.com>:
>
>> Hi folks, I have a strange issue. Trying to read a 7G file and do failry
>> simple stuff with it:
>>
>> I can read the file/do simple operations on it. However, I'd prefer to
>> increase the number of partitions in preparation for more memory-intensive
>> operations (I'm happy to wait, I just need the job to complete).
>> Repartition seems to cause an OOM for me?
>> Could someone shed light/or speculate/ why this would happen -- I thought
>> we repartition higher to relieve memory pressure?
>>
>> Im using Spark1.4.1 CDH4 if that makes a difference
>>
>> This works
>>
>> val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254))
>> res2.count
>> res1: Long = 77885925
>>
>> scala> res2.explain
>> == Physical Plan ==
>> Filter (customer_id#314 = 254)
>>  PhysicalRDD [....4], MapPartitionsRDD[11] at
>>
>> scala> res2.rdd.partitions.size
>> res3: Int = 59
>>
>> ​
>>
>>
>> This doesnt:
>>
>> scala> res2.repartition(60).count
>> [Stage 2:>                                                        (1 + 45) / 
>> 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 
>> 62, fqdn): java.lang.OutOfMemoryError: Java heap space
>>         at 
>> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729)
>>         at 
>> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
>>         at 
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
>>         at 
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>>         at 
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>>         at 
>> org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
>>         at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>>         at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207)
>>         at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>>         at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>         at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> ​
>>
>
>

Reply via email to