[CARBONDATA-2978] Fixed JVM crash issue when insert into carbon table from other carbon table
Problem: When data is inserted from one carbon to other carbon table and unsafe load and query is enabled then JVM crash is happening. Reason: When insert happens from one carbon table another table it uses same task and thread so it gets the same taskid and at the unsafe manager tries to release all memory acquired by the task even though load happens on the task. Solution: Check the listeners and ignore cache clearing. This closes #2773 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9ae91cc5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9ae91cc5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9ae91cc5 Branch: refs/heads/branch-1.5 Commit: 9ae91cc5a9d683ef54550cfe7e65c4d63d5e5a24 Parents: c016361 Author: ravipesala <[email protected]> Authored: Wed Sep 26 23:04:59 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Fri Sep 28 19:51:06 2018 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableOutputFormat.java | 35 +++++---- .../InsertIntoNonCarbonTableTestCase.scala | 79 +++++++++++++++++++- .../carbondata/spark/rdd/CarbonScanRDD.scala | 76 ++++++++++++------- .../rdd/InsertTaskCompletionListener.scala | 4 +- .../spark/rdd/QueryTaskCompletionListener.scala | 4 +- .../datasources/SparkCarbonFileFormat.scala | 23 +++++- .../CarbonTaskCompletionListener.scala | 72 ++++++++++++++++++ 7 files changed, 246 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 28817e9..762983b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -424,6 +424,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje private Future future; + private boolean isClosed; + public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, ExecutorService executorService) { @@ -442,22 +444,25 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - if (iteratorWrapper != null) { - iteratorWrapper.closeWriter(false); - } - try { - future.get(); - } catch (ExecutionException e) { - LOG.error("Error while loading data", e); - throw new InterruptedException(e.getMessage()); - } finally { - executorService.shutdownNow(); - dataLoadExecutor.close(); - ThreadLocalSessionInfo.unsetAll(); - // clean up the folders and files created locally for data load operation - TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); + if (!isClosed) { + isClosed = true; + if (iteratorWrapper != null) { + iteratorWrapper.closeWriter(false); + } + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Error while loading data", e); + throw new InterruptedException(e.getMessage()); + } finally { + executorService.shutdownNow(); + dataLoadExecutor.close(); + ThreadLocalSessionInfo.unsetAll(); + // clean up the folders and files created locally for data load operation + TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); + } + LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID()); } - LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID()); } public CarbonLoadModel getLoadModel() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala index a745672..a3fb11c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/insertQuery/InsertIntoNonCarbonTableTestCase.scala @@ -18,10 +18,13 @@ */ package org.apache.carbondata.spark.testsuite.insertQuery -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -64,6 +67,8 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll "Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions," + "Latest_operatorId,gamePointDescription,gamePointId,contractNumber', " + "'bad_records_logger_enable'='false','bad_records_action'='FORCE')") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") } test("insert into hive") { @@ -102,7 +107,79 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll sql("drop table thive_cond") } + test("jvm crash when insert data from datasource table to session table") { + val spark = sqlContext.sparkSession + import spark.implicits._ + + import scala.util.Random + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60))) + .toDF("ID", "name", "city", "age") + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + + df.write.format("carbon").saveAsTable("personTable") + spark.sql("create table test_table(ID int, name string, city string, age decimal) stored by 'carbondata' tblproperties('sort_columns'='ID')") + spark.sql("insert into test_table select * from personTable") + spark.sql("insert into test_table select * from personTable limit 2") + + assert(spark.sql("select * from test_table").count() == 12) + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + } + + test("jvm crash when insert data from datasource table to datasource table") { + val spark = sqlContext.sparkSession + import spark.implicits._ + + import scala.util.Random + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60))) + .toDF("ID", "name", "city", "age") + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + + df.write.format("carbon").saveAsTable("personTable") + spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon") + spark.sql("insert into test_table select * from personTable") + spark.sql("insert into test_table select * from personTable limit 2") + + assert(spark.sql("select * from test_table").count() == 12) + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + } + + test("jvm crash when insert data from session table to datasource table") { + val spark = sqlContext.sparkSession + import spark.implicits._ + + import scala.util.Random + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60))) + .toDF("ID", "name", "city", "age") + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + + df.write + .format("carbondata") + .option("tableName", "personTable") + .mode(SaveMode.Overwrite) + .save() + spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon") + spark.sql("insert into test_table select * from personTable") + spark.sql("insert into test_table select * from personTable limit 2") + + assert(spark.sql("select * from test_table").count() == 12) + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS test_table") + } + override def afterAll { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) sql("DROP TABLE IF EXISTS TCarbonSource") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index eb7abbc..1a7eae2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -35,6 +35,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd} import org.apache.spark.sql.util.SparkSQLUtil.sessionState @@ -470,39 +471,28 @@ class CarbonScanRDD[T: ClassTag]( val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId()) model.setStatisticsRecorder(recorder) - // TODO: rewrite this logic to call free memory in FailureListener on failures. On success, - // TODO: no memory leak should be there, resources should be freed on success completion. - val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks") - onCompleteCallbacksField.setAccessible(true) - val listeners = onCompleteCallbacksField.get(context) - .asInstanceOf[ArrayBuffer[TaskCompletionListener]] - - val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener]) - model.setFreeUnsafeMemory(!isAdded) - // add task completion before calling initialize as initialize method will internally call - // for usage of unsafe method for processing of one blocklet and if there is any exception - // while doing that the unsafe memory occupied for that task will not get cleared - context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded, - reader, - inputMetricsStats, - executionId, - taskId, - queryStartTime, - model.getStatisticsRecorder, - split, - queryId) - } - // initialize the reader - reader.initialize(inputSplit, attemptContext) - new Iterator[Any] { private var havePair = false private var finished = false + private var first = true override def hasNext: Boolean = { if (context.isInterrupted) { throw new TaskKilledException } + if (first) { + first = false + addTaskCompletionListener( + split, + context, + queryStartTime, + executionId, + taskId, + model, + reader) + // initialize the reader + reader.initialize(inputSplit, attemptContext) + } if (!finished && !havePair) { finished = !reader.nextKeyValue havePair = !finished @@ -534,6 +524,42 @@ class CarbonScanRDD[T: ClassTag]( iterator.asInstanceOf[Iterator[T]] } + private def addTaskCompletionListener(split: Partition, + context: TaskContext, + queryStartTime: Long, + executionId: String, + taskId: Int, + model: QueryModel, + reader: RecordReader[Void, Object]) = { + // TODO: rewrite this logic to call free memory in FailureListener on failures and + // On success, + // TODO: no memory leak should be there, resources should be freed on + // success completion. + val onCompleteCallbacksField = + context.getClass.getDeclaredField("onCompleteCallbacks") + onCompleteCallbacksField.setAccessible(true) + val listeners = onCompleteCallbacksField.get(context) + .asInstanceOf[ArrayBuffer[TaskCompletionListener]] + + val isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener]) + model.setFreeUnsafeMemory(!isAdded) + // add task completion before calling initialize as initialize method will internally + // call for usage of unsafe method for processing of one blocklet and if there is any + // exceptionwhile doing that the unsafe memory occupied for that task will not + // get cleared + context.addTaskCompletionListener { + new QueryTaskCompletionListener(!isAdded, + reader, + inputMetricsStats, + executionId, + taskId, + queryStartTime, + model.getStatisticsRecorder, + split, + queryId) + } + } + private def close() { TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId) inputMetricsStats.updateAndClose() http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala index dfdbd19..7246645 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala @@ -18,8 +18,8 @@ package org.apache.carbondata.spark.rdd import org.apache.spark.TaskContext +import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.util.TaskCompletionListener import org.apache.carbondata.core.util.ThreadLocalTaskInfo import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses} @@ -27,7 +27,7 @@ import org.apache.carbondata.spark.util.CommonUtil class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor, executorErrors: ExecutionErrors) - extends TaskCompletionListener { + extends CarbonLoadTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { try { dataLoadExecutor.close() http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala index e4cb3f8..97449c5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala @@ -21,8 +21,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.mapreduce.RecordReader import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonQueryTaskCompletionListener import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd} -import org.apache.spark.util.TaskCompletionListener import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.memory.UnsafeMemoryManager @@ -34,7 +34,7 @@ class QueryTaskCompletionListener(freeMemory: Boolean, var reader: RecordReader[Void, Object], inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime: Long, queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String) - extends TaskCompletionListener { + extends CarbonQueryTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { if (reader != null) { try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index a6965ac..53b1bb1 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.carbondata.execution.datasources import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -29,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.memory.MemoryMode import org.apache.spark.sql._ import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport +import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener, CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener, CarbonQueryTaskCompletionListenerImpl} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -37,7 +39,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SparkTypeConverter -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener} import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} import org.apache.carbondata.common.logging.LogServiceFactory @@ -174,6 +176,10 @@ class SparkCarbonFileFormat extends FileFormat private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] = new CarbonTableOutputFormat().getRecordWriter(context) + Option(TaskContext.get()).foreach {c => + c.addTaskCompletionListener(CarbonLoadTaskCompletionListenerImpl(recordWriter, context)) + } + /** * Write sparks internal row to carbondata record writer */ @@ -388,6 +394,15 @@ class SparkCarbonFileFormat extends FileFormat val model = format.createQueryModel(split, hadoopAttemptContext) model.setConverter(new SparkDataTypeConverterImpl) model.setPreFetchData(false) + var isAdded = false + Option(TaskContext.get()).foreach { context => + val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks") + onCompleteCallbacksField.setAccessible(true) + val listeners = onCompleteCallbacksField.get(context) + .asInstanceOf[ArrayBuffer[TaskCompletionListener]] + isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener]) + model.setFreeUnsafeMemory(!isAdded) + } val carbonReader = if (readVector) { val vectorizedReader = new VectorizedCarbonRecordReader(model, null, @@ -404,7 +419,11 @@ class SparkCarbonFileFormat extends FileFormat } val iter = new RecordReaderIterator(carbonReader) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + Option(TaskContext.get()).foreach{context => + context.addTaskCompletionListener( + CarbonQueryTaskCompletionListenerImpl( + iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded)) + } if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) { iter.asInstanceOf[Iterator[InternalRow]] http://git-wip-us.apache.org/repos/asf/carbondata/blob/9ae91cc5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala new file mode 100644 index 0000000..9d889d4 --- /dev/null +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.carbondata.execution.datasources.tasklisteners + +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.RecordReaderIterator +import org.apache.spark.util.TaskCompletionListener + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.memory.UnsafeMemoryManager +import org.apache.carbondata.core.util.ThreadLocalTaskInfo +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable + +/** + * Query completion listener + */ +trait CarbonQueryTaskCompletionListener extends TaskCompletionListener + +/** + * Load completion listener + */ +trait CarbonLoadTaskCompletionListener extends TaskCompletionListener + +case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow], + freeMemory: Boolean) extends CarbonQueryTaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = { + if (iter != null) { + try { + iter.close() + } catch { + case e: Exception => + LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e) + } + } + if (freeMemory) { + UnsafeMemoryManager.INSTANCE + .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId) + } + } +} + +case class CarbonLoadTaskCompletionListenerImpl(recordWriter: RecordWriter[NullWritable, + ObjectArrayWritable], + taskAttemptContext: TaskAttemptContext) extends CarbonLoadTaskCompletionListener { + + override def onTaskCompletion(context: TaskContext): Unit = { + try { + recordWriter.close(taskAttemptContext) + } finally { + UnsafeMemoryManager.INSTANCE + .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId) + } + } +}
