Repository: incubator-carbondata Updated Branches: refs/heads/master 0391d4223 -> b04a579d5
fixed limit query issue Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5a3d5bb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5a3d5bb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5a3d5bb8 Branch: refs/heads/master Commit: 5a3d5bb84c9f7689428f60d776e90523f6582319 Parents: 0391d42 Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Tue Sep 20 19:19:31 2016 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Wed Sep 21 03:43:21 2016 +0530 ---------------------------------------------------------------------- .../carbondata/scan/executor/QueryExecutor.java | 7 ++++ .../scan/executor/QueryExecutorFactory.java | 3 +- .../executor/impl/AbstractQueryExecutor.java | 14 ++++++++ .../scan/executor/impl/DetailQueryExecutor.java | 3 +- .../executor/impl/QueryExecutorProperties.java | 5 +++ .../carbondata/scan/model/QueryModel.java | 4 +-- .../AbstractDetailQueryResultIterator.java | 30 +++++++--------- .../iterator/DetailQueryResultIterator.java | 15 +++----- .../carbondata/hadoop/CarbonRecordReader.java | 14 ++++++-- .../spark/merger/CarbonCompactionExecutor.java | 36 +++++++++++++------- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 ++++-- .../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +++-- 12 files changed, 97 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java index 53bf8ca..1f67c84 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java @@ -37,4 +37,11 @@ public interface QueryExecutor<E> { * @throws QueryExecutionException if any failure while executing the query */ CarbonIterator<E> execute(QueryModel queryModel) throws QueryExecutionException; + + /** + * Below method will be used for cleanup + * + * @throws QueryExecutionException + */ + void finish() throws QueryExecutionException; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java index ab75231..7ed6e60 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java @@ -19,7 +19,6 @@ package org.apache.carbondata.scan.executor; import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor; -import org.apache.carbondata.scan.model.QueryModel; /** * Factory class to get the query executor from RDD @@ -27,7 +26,7 @@ import org.apache.carbondata.scan.model.QueryModel; */ public class QueryExecutorFactory { - public static QueryExecutor getQueryExecutor(QueryModel queryModel) { + public static QueryExecutor getQueryExecutor() { return new DetailQueryExecutor(); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java index c31824f..b015e49 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java @@ -19,6 +19,7 @@ package org.apache.carbondata.scan.executor.impl; import java.util.*; +import java.util.concurrent.Executors; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -85,6 +86,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryModel.getQueryId()); LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier() .getCarbonTableIdentifier().getTableName()); + // add executor service for query execution + queryProperties.executorService = Executors.newFixedThreadPool(1); // Initializing statistics list to record the query statistics // creating copy on write to handle concurrent scenario queryProperties.queryStatisticsRecorder = new QueryStatisticsRecorder(queryModel.getQueryId()); @@ -426,4 +429,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { .toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()])); } + /** + * Below method will be used to finish the execution + * + * @throws QueryExecutionException + */ + @Override public void finish() throws QueryExecutionException { + if (null != queryProperties.executorService) { + queryProperties.executorService.shutdownNow(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java index 716cdc7..f2f4b58 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java @@ -36,7 +36,8 @@ public class DetailQueryExecutor extends AbstractQueryExecutor { @Override public CarbonIterator<Object[]> execute(QueryModel queryModel) throws QueryExecutionException { List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - return new DetailQueryResultIterator(blockExecutionInfoList, queryModel); + return new DetailQueryResultIterator(blockExecutionInfoList, queryModel, + queryProperties.executorService); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java index 2f21a96..7663738 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java @@ -21,6 +21,7 @@ package org.apache.carbondata.scan.executor.impl; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; @@ -83,6 +84,10 @@ public class QueryExecutorProperties { */ public QueryStatisticsRecorder queryStatisticsRecorder; /** + * executor service to execute the query + */ + public ExecutorService executorService; + /** * list of blocks in which query will be executed */ protected List<AbstractIndex> dataBlocks; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java index 1c819a2..10cbd25 100644 --- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java @@ -199,8 +199,8 @@ public class QueryModel implements Serializable { } - public static void processFilterExpression( - Expression filterExpression, List<CarbonDimension> dimensions, List<CarbonMeasure> measures) { + public static void processFilterExpression(Expression filterExpression, + List<CarbonDimension> dimensions, List<CarbonMeasure> measures) { if (null != filterExpression) { if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) { if (filterExpression instanceof ConditionalExpression) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java index 02505e8..22a4412 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -19,6 +19,7 @@ package org.apache.carbondata.scan.result.iterator; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -51,41 +52,36 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { private static final LogService LOGGER = LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName()); + protected ExecutorService execService; /** * execution info of the block */ protected List<BlockExecutionInfo> blockExecutionInfos; - - /** - * number of cores which can be used - */ - private int batchSize; - /** * file reader which will be used to execute the query */ protected FileHolder fileReader; - protected AbstractDataBlockIterator dataBlockIterator; - protected boolean nextBatch = false; - /** * total time scan the blocks */ protected long totalScanTime; - /** * is the statistic recorded */ protected boolean isStatisticsRecorded; - /** - * QueryStatisticsRecorder + * QueryStatisticsRecorder */ protected QueryStatisticsRecorder recorder; + /** + * number of cores which can be used + */ + private int batchSize; - public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) { + public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { String batchSizeString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE); if (null != batchSizeString) { @@ -102,16 +98,17 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { this.blockExecutionInfos = infos; this.fileReader = FileFactory.getFileHolder( FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath())); + this.execService = execService; intialiseInfos(); } private void intialiseInfos() { - totalScanTime=System.currentTimeMillis(); + totalScanTime = System.currentTimeMillis(); for (BlockExecutionInfo blockInfo : blockExecutionInfos) { DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize()); DataRefNode startDataBlock = finder .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); - while(startDataBlock.nodeNumber()!= blockInfo.getStartBlockletIndex()) { + while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) { startDataBlock = startDataBlock.getNextDataRefNode(); } @@ -154,7 +151,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { } private DataBlockIteratorImpl getDataBlockIterator() { - if(blockExecutionInfos.size() > 0) { + if (blockExecutionInfos.size() > 0) { BlockExecutionInfo executionInfo = blockExecutionInfos.get(0); blockExecutionInfos.remove(executionInfo); return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize); @@ -162,5 +159,4 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { return null; } - } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java index 0013c0a..a958195 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java @@ -21,9 +21,7 @@ package org.apache.carbondata.scan.result.iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo; @@ -37,14 +35,12 @@ import org.apache.carbondata.scan.result.BatchResult; */ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator { - private ExecutorService execService = Executors.newFixedThreadPool(1); - - private Future<BatchResult> future; - private final Object lock = new Object(); + private Future<BatchResult> future; - public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) { - super(infos, queryModel); + public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); } @Override public BatchResult next() { @@ -60,13 +56,10 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator nextBatch = true; future = execute(); } else { - execService.shutdown(); - execService.awaitTermination(1, TimeUnit.HOURS); fileReader.finish(); } totalScanTime += System.currentTimeMillis() - startTime; } catch (Exception ex) { - execService.shutdown(); fileReader.finish(); throw new RuntimeException(ex); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index fd0a438..443922c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.scan.executor.QueryExecutor; import org.apache.carbondata.scan.executor.QueryExecutorFactory; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.model.QueryModel; @@ -50,9 +51,12 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { private CarbonIterator<Object[]> carbonIterator; + private QueryExecutor queryExecutor; + public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) { this.queryModel = queryModel; this.readSupport = readSupport; + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) @@ -69,9 +73,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { readSupport .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier()); try { - carbonIterator = new ChunkRowIterator( - (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel) - .execute(queryModel)); + carbonIterator = + new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel)); } catch (QueryExecutionException e) { throw new InterruptedException(e.getMessage()); } @@ -105,5 +108,10 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { } // close read support readSupport.close(); + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + throw new IOException(e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java index ac6b697..6a2c839 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java +++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java @@ -52,22 +52,21 @@ import org.apache.carbondata.scan.result.iterator.RawResultIterator; */ public class CarbonCompactionExecutor { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName()); private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping; - private QueryExecutor queryExecutor; private final SegmentProperties destinationSegProperties; private final String databaseName; private final String factTableName; private final Map<String, TaskBlockInfo> segmentMapping; private final String storePath; + private QueryExecutor queryExecutor; private CarbonTable carbonTable; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName()); - private QueryModel queryModel; /** * Constructor + * * @param segmentMapping * @param segmentProperties * @param databaseName @@ -97,6 +96,7 @@ public class CarbonCompactionExecutor { /** * For processing of the table blocks. + * * @return List of Carbon iterators */ public List<RawResultIterator> processTableBlocks() throws QueryExecutionException { @@ -113,10 +113,8 @@ public class CarbonCompactionExecutor { int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality(); - SegmentProperties sourceSegProperties = new SegmentProperties( - listMetadata.get(0).getColumnInTable(), - colCardinality - ); + SegmentProperties sourceSegProperties = + new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality); // for each segment get taskblock info TaskBlockInfo taskBlockInfo = taskMap.getValue(); @@ -128,7 +126,7 @@ public class CarbonCompactionExecutor { Collections.sort(list); LOGGER.info("for task -" + task + "-block size is -" + list.size()); queryModel.setTableBlockInfos(list); - resultList.add(new RawResultIterator( executeBlockList(list),sourceSegProperties, + resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties, destinationSegProperties)); } @@ -147,7 +145,7 @@ public class CarbonCompactionExecutor { throws QueryExecutionException { queryModel.setTableBlockInfos(blockList); - this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(); CarbonIterator<BatchResult> iter = null; try { iter = queryExecutor.execute(queryModel); @@ -160,10 +158,23 @@ public class CarbonCompactionExecutor { } /** + * Below method will be used + * for cleanup + */ + public void finish() { + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + LOGGER.error(e, "Problem while finish: "); + } + clearDictionaryFromQueryModel(); + } + + /** * This method will clear the dictionary access count after its usage is complete so * that column can be deleted form LRU cache whenever memory reaches threshold */ - public void clearDictionaryFromQueryModel() { + private void clearDictionaryFromQueryModel() { if (null != queryModel) { Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); if (null != columnToDictionaryMapping) { @@ -176,6 +187,7 @@ public class CarbonCompactionExecutor { /** * Preparing of the query model. + * * @param blockList * @return */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index f7f6949..1f016ba 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -98,6 +98,7 @@ class CarbonMergerRDD[K, V]( LOGGER.info("Temp storeLocation taken is " + storeLocation) var mergeStatus = false var mergeNumber = "" + var exec: CarbonCompactionExecutor = null try { var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] @@ -121,7 +122,7 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.setStorePath(hdfsStoreLocation) - val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName, + exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName, factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, dataFileMetadataSegMapping ) @@ -132,7 +133,9 @@ class CarbonMergerRDD[K, V]( result2 = exec.processTableBlocks() } catch { case e: Throwable => - exec.clearDictionaryFromQueryModel + if (null != exec) { + exec.finish + } LOGGER.error(e) if (null != e.getMessage) { sys.error("Exception occurred in query execution :: " + e.getMessage) @@ -140,7 +143,6 @@ class CarbonMergerRDD[K, V]( sys.error("Exception occurred in query execution.Please check logs.") } } - mergeNumber = mergedLoadName .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length() @@ -184,6 +186,9 @@ class CarbonMergerRDD[K, V]( case e: Exception => LOGGER.error(e) } + if (null != exec) { + exec.finish + } } var finished = false http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index e676687..e8915c4 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -38,6 +38,7 @@ import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryS import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} import org.apache.carbondata.lcm.status.SegmentStatusManager +import org.apache.carbondata.scan.executor.QueryExecutor import org.apache.carbondata.scan.executor.QueryExecutorFactory import org.apache.carbondata.scan.expression.Expression import org.apache.carbondata.scan.model.QueryModel @@ -47,6 +48,8 @@ import org.apache.carbondata.spark.RawValue import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.util.QueryPlanUtil + + class CarbonSparkPartition(rddId: Int, val idx: Int, val locations: Array[String], val tableBlockInfos: util.List[TableBlockInfo]) @@ -186,10 +189,12 @@ class CarbonScanRDD[V: ClassTag]( val iter = new Iterator[V] { var rowIterator: CarbonIterator[Array[Any]] = _ var queryStartTime: Long = 0 + val queryExecutor = QueryExecutorFactory.getQueryExecutor() try { context.addTaskCompletionListener(context => { clearDictionaryCache(queryModel.getColumnToDictionaryMapping) logStatistics() + queryExecutor.finish }) val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] if(!carbonSparkPartition.tableBlockInfos.isEmpty) { @@ -197,7 +202,6 @@ class CarbonScanRDD[V: ClassTag]( // fill table block info queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) queryStartTime = System.currentTimeMillis - val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) logInfo("*************************" + carbonPropertiesFilePath) if (null == carbonPropertiesFilePath) { @@ -206,7 +210,7 @@ class CarbonScanRDD[V: ClassTag]( } // execute query rowIterator = new ChunkRowIterator( - QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel). + queryExecutor.execute(queryModel). asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]] }