Hi All,

I am facing the below exception when the size of the file being read in a
partition is above 2GB. This is apparently because Java's limitation on
memory mapped files. It supports mapping only 2GB files.

Caused by: java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
    at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
    at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
    at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
    at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
    at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

My use case is to read the files from S3 and do some processing. I am
caching the data like below in order to avoid SocketTimeoutExceptions from
another library I am using for the processing.

val rdd1 = sc.textFile("*******").coalesce(1000)
rdd1.persist(DISK_ONLY_2) // replication factor 2
rdd1.foreachPartition { iter => } // one pass over the data to download

The 3rd line fails with the above error when a partition contains a file of
size more than 2GB file.

Do you think this needs to be fixed in Spark? One idea may be is to use a
wrapper class (something called BigByteBuffer) which keeps an array of
ByteBuffers and keeps the index of the current buffer being read etc. Below
is the modified DiskStore.scala.

private def getBytes(file: File, offset: Long, length: Long):
Option[ByteBuffer] = {
  val channel = new RandomAccessFile(file, "r").getChannel
  Utils.tryWithSafeFinally {
    // For small files, directly read rather than memory map
    if (length < minMemoryMapBytes) {
      // Map small file in Memory
    } else {
      // TODO Create a BigByteBuffer

    }
  } {
    channel.close()
  }
}

class BigByteBuffer extends ByteBuffer {
  val buffers: Array[ByteBuffer]
  var currentIndex = 0

  ... // Other methods
}

Please let me know if there is any other work-around for the same.
Thanks for your time.

Regards,
Jegan

Reply via email to