[
https://issues.apache.org/jira/browse/SPARK-9067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14629399#comment-14629399
]
konstantin knizhnik commented on SPARK-9067:
--------------------------------------------
I have found workaround for the problem: substitute task context with fake
context. I have implemented CombineRDD as replacement of CoalescedRDD and
create separate task context for processing each partition:
{quote}
class CombineRDD[T: ClassTag](prev: RDD[T], maxPartitions: Int) extends
RDD[T](prev)
{
val inputPartitions = prev.partitions
class CombineIterator(partitions: Array[Partition], index: Int, context:
TaskContext) extends Iterator[T]
{
var iter : Iterator[T] = null
var i = index
def hasNext() : Boolean = {
while ((iter == null || !iter.hasNext) && i < partitions.length) {
val ctx = new CombineTaskContext(context.stageId,
context.partitionId, context.taskAttemptId, context.attemptNumber,
null/*context.taskMemoryManager*/, context.isRunningLocally,
context.taskMetrics)
iter = firstParent[T].compute(partitions(i), ctx)
//ctx.complete()
partitions(i) = null
i = i + maxPartitions
}
iter != null && iter.hasNext
}
def next() = { iter.next }
}
class CombineTaskContext(val stageId: Int,
val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
val runningLocally: Boolean = true,
val taskMetrics: TaskMetrics = null) extends TaskContext
{
@transient private val onCompleteCallbacks = new
ArrayBuffer[TaskCompletionListener]
override def attemptId(): Long = taskAttemptId
override def addTaskCompletionListener(listener:
TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
}
def complete(): Unit = {
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach { listener =>
listener.onTaskCompletion(this)
}
}
override def addTaskCompletionListener(f: TaskContext => Unit):
this.type = {
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit =
f(context)
}
this
}
override def addOnCompleteCallback(f: () => Unit) {
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f()
}
}
override def isCompleted(): Boolean = false
override def isRunningLocally(): Boolean = true
override def isInterrupted(): Boolean = false
}
case class CombinePartition(index : Int) extends Partition
protected def getPartitions: Array[Partition] =
Array.tabulate(maxPartitions){i => CombinePartition(i)}
override def compute(partition: Partition, context: TaskContext):
Iterator[T] = {
new CombineIterator(inputPartitions, partition.index, context)
}
}
{quote}
I works: no memory overflow or file limit exhaustion. But certainly it can not
be considered as solution of the problem. Also please notice that I have to
comment call of ctx.complete(), otherwise I got exception caused by access to
closed stream. It is strange because I think that partition corresponds to
single parquet file and so it can be proceeded independently. But looks like
GC&finalization do their work.
> Memory overflow and open file limit exhaustion for NewParquetRDD+CoalescedRDD
> -----------------------------------------------------------------------------
>
> Key: SPARK-9067
> URL: https://issues.apache.org/jira/browse/SPARK-9067
> Project: Spark
> Issue Type: Improvement
> Components: Input/Output
> Affects Versions: 1.3.0, 1.4.0
> Environment: Target system: Linux, 16 cores, 400Gb RAM
> Spark is started locally using the following command:
> {{
> spark-submit --master local[16] --driver-memory 64G --executor-cores 16
> --num-executors 1 --executor-memory 64G
> }}
> Reporter: konstantin knizhnik
>
> If coalesce transformation with small number of output partitions (in my case
> 16) is applied to large Parquet file (in my has about 150Gb with 215k
> partitions), then it case OutOfMemory exceptions 250Gb is not enough) and
> open file limit exhaustion (with limit set to 8k).
> The source of the problem is in SqlNewHad\oopRDD.compute method:
> {quote}
> val reader = format.createRecordReader(
> split.serializableHadoopSplit.value, hadoopAttemptContext)
> reader.initialize(split.serializableHadoopSplit.value,
> hadoopAttemptContext)
> // Register an on-task-completion callback to close the input stream.
> context.addTaskCompletionListener(context => close())
> {quote}
> Created Parquet file reader is intended to be closed at task completion time.
> This reader contains a lot of references to parquet.bytes.BytesInput object
> which in turn contains reference sot large byte arrays (some of them are
> several megabytes).
> As far as in case of CoalescedRDD task is completed only after processing
> larger number of parquet files, it cause file handles exhaustion and memory
> overflow.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]