I meant to say just copy everything to a local hdfs, and then don't use caching ...
On Mon, Oct 5, 2015 at 4:52 PM, Jegan <jega...@gmail.com> wrote: > I am sorry, I didn't understand it completely. Are you suggesting to copy > the files from S3 to HDFS? Actually, that is what I am doing. I am reading > the files using Spark and persisting it locally. > > Or did you actually mean to ask the producer to write the files directly > to HDFS instead of S3? I am not sure I can do this now either. > > Please clarify me if I misunderstood what you meant. > > Thanks, > Jegan > > On Mon, Oct 5, 2015 at 4:42 PM, Reynold Xin <r...@databricks.com> wrote: > >> You can write the data to local hdfs (or local disk) and just load it >> from there. >> >> >> On Mon, Oct 5, 2015 at 4:37 PM, Jegan <jega...@gmail.com> wrote: >> >>> Thanks for your suggestion Ted. >>> >>> Unfortunately at this point of time I cannot go beyond 1000 partitions. >>> I am writing this data to BigQuery and it has a limit of 1000 jobs per day >>> for a table(they have some limits on this) I currently create 1 load job >>> per partition. Is there any other work-around? >>> >>> Thanks again. >>> >>> Regards, >>> Jegan >>> >>> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> As a workaround, can you set the number of partitions higher in the >>>> sc.textFile method ? >>>> >>>> Cheers >>>> >>>> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am facing the below exception when the size of the file being read >>>>> in a partition is above 2GB. This is apparently because Java's limitation >>>>> on memory mapped files. It supports mapping only 2GB files. >>>>> >>>>> Caused by: java.lang.IllegalArgumentException: Size exceeds >>>>> Integer.MAX_VALUE >>>>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >>>>> at >>>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >>>>> at >>>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >>>>> at >>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >>>>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >>>>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >>>>> at >>>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >>>>> at >>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >>>>> at >>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >>>>> at >>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >>>>> at >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> My use case is to read the files from S3 and do some processing. I am >>>>> caching the data like below in order to avoid SocketTimeoutExceptions from >>>>> another library I am using for the processing. >>>>> >>>>> val rdd1 = sc.textFile("*******").coalesce(1000) >>>>> rdd1.persist(DISK_ONLY_2) // replication factor 2 >>>>> rdd1.foreachPartition { iter => } // one pass over the data to download >>>>> >>>>> The 3rd line fails with the above error when a partition contains a >>>>> file of size more than 2GB file. >>>>> >>>>> Do you think this needs to be fixed in Spark? One idea may be is to >>>>> use a wrapper class (something called BigByteBuffer) which keeps an array >>>>> of ByteBuffers and keeps the index of the current buffer being read etc. >>>>> Below is the modified DiskStore.scala. >>>>> >>>>> private def getBytes(file: File, offset: Long, length: Long): >>>>> Option[ByteBuffer] = { >>>>> val channel = new RandomAccessFile(file, "r").getChannel >>>>> Utils.tryWithSafeFinally { >>>>> // For small files, directly read rather than memory map >>>>> if (length < minMemoryMapBytes) { >>>>> // Map small file in Memory >>>>> } else { >>>>> // TODO Create a BigByteBuffer >>>>> >>>>> } >>>>> } { >>>>> channel.close() >>>>> } >>>>> } >>>>> >>>>> class BigByteBuffer extends ByteBuffer { >>>>> val buffers: Array[ByteBuffer] >>>>> var currentIndex = 0 >>>>> >>>>> ... // Other methods >>>>> } >>>>> >>>>> Please let me know if there is any other work-around for the same. Thanks >>>>> for your time. >>>>> >>>>> Regards, >>>>> Jegan >>>>> >>>> >>>> >>> >> >