Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2591#discussion_r206733033
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
---
@@ -457,13 +458,25 @@ class CarbonScanRDD[T: ClassTag](
}
}
+ // TODO: rewrite this logic to call free memory in FailureListener
on failures. On success,
+ // no memory leak should be there, resources should be freed on
success completion.
+ // TODO: If CarbonRecordReader and VectorizedCarbonReader are called
directly
+ // from any other callers, then in the caller, resource clearing
should be taken care
+ val listeners =
CarbonReflectionUtils.getField("onCompleteCallbacks", context)
+ .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+ val isAdded = listeners.exists(p =>
p.isInstanceOf[InsertTaskCompletionListener])
// add task completion before calling initialize as initialize
method will internally call
// for usage of unsafe method for processing of one blocklet and if
there is any exception
// while doing that the unsafe memory occupied for that task will
not get cleared
- context.addTaskCompletionListener { _ =>
- closeReader.apply()
- close()
- logStatistics(executionId, taskId, queryStartTime,
model.getStatisticsRecorder, split)
+ context.addTaskCompletionListener { new
QueryTaskCompletionListener(!isAdded,
--- End diff --
Remove the duplicated code which is copied to `QueryTaskCompletionListener`
from this class
---