Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2591#discussion_r206733033 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -457,13 +458,25 @@ class CarbonScanRDD[T: ClassTag]( } } + // TODO: rewrite this logic to call free memory in FailureListener on failures. On success, + // no memory leak should be there, resources should be freed on success completion. + // TODO: If CarbonRecordReader and VectorizedCarbonReader are called directly + // from any other callers, then in the caller, resource clearing should be taken care + val listeners = CarbonReflectionUtils.getField("onCompleteCallbacks", context) + .asInstanceOf[ArrayBuffer[TaskCompletionListener]] + val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener]) // add task completion before calling initialize as initialize method will internally call // for usage of unsafe method for processing of one blocklet and if there is any exception // while doing that the unsafe memory occupied for that task will not get cleared - context.addTaskCompletionListener { _ => - closeReader.apply() - close() - logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split) + context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded, --- End diff -- Remove the duplicated code which is copied to `QueryTaskCompletionListener` from this class
---