Thanks Yanbo, I was running with 1G per executor; my file is 7.5 G, running with the standard block size of 128M, resulting in 7500/128M= 59 partitions naturally. My boxes have 8CPUs, so I figured they could be processing 8 tasks/partitions at a time, needing
8*(partition_size) memory per executor, so 8*128M = 1G Is this the right way to do this math? I'm confused about _decreasing_ the number of partitions -- I thought from a spark perspective, 7.5G / 10 partitions would result in 750M per partition. So a Spark executor with 8 cores would potentially need 750*8=6000M of memory Maybe my confusion comes from terminology -- I thought in Spark the "default" number of partitions is always the number of input splits. From your example (number of partitions) * (Parquet block size) = Minimum Required Memory, yet this would also be the Parquet overall file size from my understanding (number of partitions) = FileSize/(Parquet block size) It cannot be that Minimum Required Memory= Parquet file size On Sat, Sep 5, 2015 at 11:00 PM, Yanbo Liang <yblia...@gmail.com> wrote: > The Parquet output writer allocates one block for each table partition it > is processing and writes partitions in parallel. It will run out of > memory if (number of partitions) times (Parquet block size) is greater than > the available memory. You can try to decrease the number of partitions. And > could you share the value of "parquet.block.size" and your available memory? > > 2015-09-05 18:59 GMT+08:00 Yana Kadiyska <yana.kadiy...@gmail.com>: > >> Hi folks, I have a strange issue. Trying to read a 7G file and do failry >> simple stuff with it: >> >> I can read the file/do simple operations on it. However, I'd prefer to >> increase the number of partitions in preparation for more memory-intensive >> operations (I'm happy to wait, I just need the job to complete). >> Repartition seems to cause an OOM for me? >> Could someone shed light/or speculate/ why this would happen -- I thought >> we repartition higher to relieve memory pressure? >> >> Im using Spark1.4.1 CDH4 if that makes a difference >> >> This works >> >> val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254)) >> res2.count >> res1: Long = 77885925 >> >> scala> res2.explain >> == Physical Plan == >> Filter (customer_id#314 = 254) >> PhysicalRDD [....4], MapPartitionsRDD[11] at >> >> scala> res2.rdd.partitions.size >> res3: Int = 59 >> >> >> >> >> This doesnt: >> >> scala> res2.repartition(60).count >> [Stage 2:> (1 + 45) / >> 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID >> 62, fqdn): java.lang.OutOfMemoryError: Java heap space >> at >> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729) >> at >> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490) >> at >> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116) >> at >> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) >> at >> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) >> at >> org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163) >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) >> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:70) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >> > >