[ 
https://issues.apache.org/jira/browse/SPARK-9067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14629399#comment-14629399
 ] 

konstantin knizhnik commented on SPARK-9067:
--------------------------------------------

I have found workaround for the problem:  substitute task context with fake 
context. I have implemented CombineRDD as replacement of CoalescedRDD and 
create separate task context for processing each partition:

{quote}
    class CombineRDD[T: ClassTag](prev: RDD[T], maxPartitions: Int) extends 
RDD[T](prev)
    {
      val inputPartitions = prev.partitions
      class CombineIterator(partitions: Array[Partition], index: Int, context: 
TaskContext) extends Iterator[T]
      {
        var iter : Iterator[T] = null
        var i = index
        def hasNext() : Boolean = {        
          while ((iter == null || !iter.hasNext) && i < partitions.length) { 
            val ctx = new CombineTaskContext(context.stageId, 
context.partitionId, context.taskAttemptId, context.attemptNumber, 
null/*context.taskMemoryManager*/, context.isRunningLocally, 
context.taskMetrics)     
            iter = firstParent[T].compute(partitions(i), ctx)
            //ctx.complete()
            partitions(i) = null
            i = i + maxPartitions
          }
          iter != null && iter.hasNext
        }
      
        def next() = { iter.next }
     }

     class CombineTaskContext(val stageId: Int,
       val partitionId: Int,
       override val taskAttemptId: Long,
       override val attemptNumber: Int,
       override val taskMemoryManager: TaskMemoryManager,
       val runningLocally: Boolean = true,
       val taskMetrics: TaskMetrics = null) extends TaskContext 
     {
       @transient private val onCompleteCallbacks = new 
ArrayBuffer[TaskCompletionListener]

       override def attemptId(): Long = taskAttemptId

       override def addTaskCompletionListener(listener: 
TaskCompletionListener): this.type = {
         onCompleteCallbacks += listener
         this
       }
       def complete(): Unit = {
         // Process complete callbacks in the reverse order of registration
         onCompleteCallbacks.reverse.foreach { listener =>
           listener.onTaskCompletion(this)
         }
       }
       override def addTaskCompletionListener(f: TaskContext => Unit): 
this.type = {
         onCompleteCallbacks += new TaskCompletionListener {
           override def onTaskCompletion(context: TaskContext): Unit = 
f(context)
         }
         this
       }
       override def addOnCompleteCallback(f: () => Unit) {
         onCompleteCallbacks += new TaskCompletionListener {
           override def onTaskCompletion(context: TaskContext): Unit = f()
         }
       }
       override def isCompleted(): Boolean = false

       override def isRunningLocally(): Boolean = true

       override def isInterrupted(): Boolean = false
     } 


       
     case class CombinePartition(index : Int) extends Partition

     protected def getPartitions: Array[Partition] = 
Array.tabulate(maxPartitions){i => CombinePartition(i)}
 
     override def compute(partition: Partition, context: TaskContext): 
Iterator[T] = {
        new CombineIterator(inputPartitions, partition.index, context)
     }
   }
{quote}

I works: no memory overflow or file limit exhaustion. But certainly it can not 
be considered as solution of the problem. Also please notice that I have to 
comment call of ctx.complete(), otherwise I got exception caused by access to 
closed stream. It is strange because I think that partition corresponds to 
single parquet file and so it can be proceeded independently. But looks like 
GC&finalization do their work.
       


> Memory overflow and open file limit exhaustion for NewParquetRDD+CoalescedRDD
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-9067
>                 URL: https://issues.apache.org/jira/browse/SPARK-9067
>             Project: Spark
>          Issue Type: Improvement
>          Components: Input/Output
>    Affects Versions: 1.3.0, 1.4.0
>         Environment: Target system: Linux, 16 cores, 400Gb RAM
> Spark is started locally using the following command:
> {{
> spark-submit --master local[16] --driver-memory 64G --executor-cores 16 
> --num-executors 1  --executor-memory 64G
> }}
>            Reporter: konstantin knizhnik
>
> If coalesce transformation with small number of output partitions (in my case 
> 16) is applied to large Parquet file (in my has about 150Gb with 215k 
> partitions), then it case OutOfMemory exceptions 250Gb is not enough) and 
> open file limit exhaustion (with limit set to 8k).
> The source of the problem is in SqlNewHad\oopRDD.compute method:
> {quote}
>       val reader = format.createRecordReader(
>         split.serializableHadoopSplit.value, hadoopAttemptContext)
>       reader.initialize(split.serializableHadoopSplit.value, 
> hadoopAttemptContext)
>       // Register an on-task-completion callback to close the input stream.
>       context.addTaskCompletionListener(context => close())
> {quote}
> Created Parquet file reader is intended to be closed at task completion time. 
> This reader contains a lot of references to  parquet.bytes.BytesInput object 
> which in turn contains reference sot large byte arrays (some of them are 
> several megabytes).
> As far as in case of CoalescedRDD task is completed only after processing 
> larger number of parquet files, it cause file handles exhaustion and memory 
> overflow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to