Repository: carbondata Updated Branches: refs/heads/master d327cb2bd -> 6d37b0572
[CARBONDATA-1309] During Global Sort the DataWriterProcessorStepImpl not getting shutdown even when datalaod is finished. Global Sort is not closing the DataWriterProcessorStepImpl because of this the thread started to print the number of records processed by the step at the interval of 10 second is not getting shutdown. This closes #1188 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6d37b057 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6d37b057 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6d37b057 Branch: refs/heads/master Commit: 6d37b05721a940167ef9ab5849bc5277e0502a91 Parents: d327cb2 Author: mohammadshahidkhan <[email protected]> Authored: Thu Jul 20 15:17:53 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Aug 3 00:22:58 2017 +0800 ---------------------------------------------------------------------- .../spark/load/DataLoadProcessorStepOnSpark.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6d37b057/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 3f1637b..4d1267a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -151,7 +151,7 @@ object DataLoadProcessorStepOnSpark { var model: CarbonLoadModel = null var tableName: String = null var rowConverter: RowConverterImpl = null - + var dataWriter: DataWriterProcessorStepImpl = null try { model = modelBroadcast.value.getCopyWithTaskNo(index.toString) val storeLocation = Array(getTempStoreLocation(index)) @@ -166,7 +166,7 @@ object DataLoadProcessorStepOnSpark { rowConverter.initialize() conf.setCardinalityFinder(rowConverter) - val dataWriter = new DataWriterProcessorStepImpl(conf) + dataWriter = new DataWriterProcessorStepImpl(conf) val dataHandlerModel = dataWriter.getDataHandlerModel(0) var dataHandler: CarbonFactHandler = null @@ -198,6 +198,11 @@ object DataLoadProcessorStepOnSpark { if (rowConverter != null) { rowConverter.finish() } + // close the dataWriter once the write in done success or fail. if not closed then thread to + // to prints the rows processed in each step for every 10 seconds will never exit. + if(null != dataWriter) { + dataWriter.close() + } // clean up the folders and files created locally for data load operation CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) }
