Repository: carbondata Updated Branches: refs/heads/master 0311c439a -> 638ed1fa7
[CARBONDATA-2297] Support SEARCH_MODE for basic filter query 1. Add a new spark schedule type. 2.Add a new Query Executor This closes #2123 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/638ed1fa Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/638ed1fa Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/638ed1fa Branch: refs/heads/master Commit: 638ed1fa7094d4a139a9a1cb08b123dfde31dd87 Parents: 0311c43 Author: Manhua <[email protected]> Authored: Thu Mar 29 19:50:39 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Apr 5 23:20:24 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 13 ++ .../executor/impl/AbstractQueryExecutor.java | 8 +- .../scan/executor/impl/DetailQueryExecutor.java | 2 + .../SearchModeVectorDetailQueryExecutor.java | 67 ++++++++++ .../impl/VectorDetailQueryExecutor.java | 2 + .../core/scan/processor/BlockScan.java | 98 ++++++++++++++ .../AbstractDetailQueryResultIterator.java | 6 +- .../iterator/SearchModeResultIterator.java | 134 +++++++++++++++++++ .../carbondata/core/util/SessionParams.java | 2 + .../detailquery/SearchModeTestCase.scala | 53 ++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 11 +- .../VectorizedCarbonRecordReader.java | 11 +- 12 files changed, 398 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 001ee72..aca317a 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1629,6 +1629,19 @@ public final class CarbonCommonConstants { */ public static final String CARBON_SYSTEM_FOLDER_LOCATION = "carbon.system.folder.location"; + @CarbonProperty + public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.mode.enable"; + + public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false"; + + /** + * Num of threads used in query executor when using search mode. + */ + @CarbonProperty + public static final String CARBON_SEARCH_MODE_THREAD = "carbon.search.mode.thread"; + + public static final String CARBON_SEARCH_MODE_THREAD_DEFAULT = "3"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 676976a..f0f5bce 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -26,7 +26,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -100,6 +100,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryProperties = new QueryExecutorProperties(); } + public void setExecutorService(ExecutorService executorService) { + // add executor service for query execution + queryProperties.executorService = executorService; + } /** * Below method will be used to fill the executor properties based on query * model it will parse the query model and get the detail and fill it in @@ -113,8 +117,6 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryModel.getQueryId()); LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier() .getCarbonTableIdentifier().getTableName()); - // add executor service for query execution - queryProperties.executorService = Executors.newCachedThreadPool(); // Initializing statistics list to record the query statistics // creating copy on write to handle concurrent scenario queryProperties.queryStatisticsRecorder = http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java index 93d696b..46ef43d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.executor.impl; import java.io.IOException; import java.util.List; +import java.util.concurrent.Executors; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; @@ -36,6 +37,7 @@ public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> { @Override public CarbonIterator<RowBatch> execute(QueryModel queryModel) throws QueryExecutionException, IOException { + this.setExecutorService(Executors.newCachedThreadPool()); List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); this.queryIterator = new DetailQueryResultIterator( blockExecutionInfoList, http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java new file mode 100644 index 0000000..a729966 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.executor.impl; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; +import org.apache.carbondata.core.util.CarbonProperties; + +/** + * Below class will be used to execute the detail query and returns columnar vectors. + */ +public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { + private static final LogService LOGGER = + LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); + private static ExecutorService executorService; + + static { + int nThread; + try { + nThread = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD, + CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD_DEFAULT)); + } catch (NumberFormatException e) { + nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_THREAD_DEFAULT); + LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); + } + executorService = Executors.newFixedThreadPool(nThread); + } + + @Override + public CarbonIterator<Object> execute(QueryModel queryModel) + throws QueryExecutionException, IOException { + List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); + this.queryIterator = new SearchModeResultIterator( + blockExecutionInfoList, + queryModel, + executorService + ); + return this.queryIterator; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java index ad1a558..7787e4c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.executor.impl; import java.io.IOException; import java.util.List; +import java.util.concurrent.Executors; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; @@ -33,6 +34,7 @@ public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { @Override public CarbonIterator<Object> execute(QueryModel queryModel) throws QueryExecutionException, IOException { + this.setExecutorService(Executors.newCachedThreadPool()); List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); this.queryIterator = new VectorDetailQueryResultIterator( blockExecutionInfoList, http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java new file mode 100644 index 0000000..eb41071 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.scan.collector.ResultCollectorFactory; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner; +import org.apache.carbondata.core.stats.QueryStatisticsModel; + +public class BlockScan { + private BlockExecutionInfo blockExecutionInfo; + private FileReader fileReader; + private BlockletScanner blockletScanner; + private BlockletIterator blockletIterator; + private ScannedResultCollector scannerResultAggregator; + + private List<BlockletScannedResult> scannedResults = new LinkedList<>(); + private int nextResultIndex = 0; + private BlockletScannedResult curResult; + + public BlockScan(BlockExecutionInfo blockExecutionInfo, FileReader fileReader, + QueryStatisticsModel queryStatisticsModel) { + this.blockExecutionInfo = blockExecutionInfo; + this.fileReader = fileReader; + this.blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(), + blockExecutionInfo.getNumberOfBlockToScan()); + if (blockExecutionInfo.getFilterExecuterTree() != null) { + blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel); + } else { + blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel); + } + this.scannerResultAggregator = + ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo); + } + + public void scan() throws Exception { + BlockletScannedResult blockletScannedResult = null; + while (blockletIterator.hasNext()) { + DataRefNode dataBlock = blockletIterator.next(); + if (dataBlock.getColumnsMaxValue() == null || blockletScanner.isScanRequired(dataBlock)) { + RawBlockletColumnChunks rawBlockletColumnChunks = RawBlockletColumnChunks.newInstance( + blockExecutionInfo.getTotalNumberDimensionToRead(), + blockExecutionInfo.getTotalNumberOfMeasureToRead(), fileReader, dataBlock); + blockletScanner.readBlocklet(rawBlockletColumnChunks); + blockletScannedResult = blockletScanner.scanBlocklet(rawBlockletColumnChunks); + if (blockletScannedResult != null && blockletScannedResult.hasNext()) { + scannedResults.add(blockletScannedResult); + } + } + } + fileReader.finish(); + } + + public boolean hasNext() { + if (curResult != null && curResult.hasNext()) { + return true; + } else { + if (null != curResult) { + curResult.freeMemory(); + } + if (nextResultIndex < scannedResults.size()) { + curResult = scannedResults.get(nextResultIndex++); + return true; + } else { + return false; + } + } + } + + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + this.scannerResultAggregator.collectResultInColumnarBatch(curResult, columnarBatch); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java index 4e628fe..e8a61fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -62,11 +62,11 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap = new ConcurrentHashMap<>(); - private ExecutorService execService; + protected ExecutorService execService; /** * execution info of the block */ - private List<BlockExecutionInfo> blockExecutionInfos; + protected List<BlockExecutionInfo> blockExecutionInfos; /** * file reader which will be used to execute the query @@ -78,7 +78,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato /** * QueryStatisticsRecorder */ - private QueryStatisticsRecorder recorder; + protected QueryStatisticsRecorder recorder; /** * number of cores which can be used */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java new file mode 100644 index 0000000..ae46242 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.processor.BlockScan; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; + +public class SearchModeResultIterator extends AbstractDetailQueryResultIterator<Object> { + + private final Object lock = new Object(); + + private FileFactory.FileType fileType; + private List<Future<BlockScan>> taskSubmitList; + private BlockScan curBlockScan; + private int nextBlockScanIndex = 0; + + public SearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); + this.fileType = FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()); + scanAll(); + } + + private void scanAll() { + taskSubmitList = new ArrayList<>(blockExecutionInfos.size()); + for (final BlockExecutionInfo info: blockExecutionInfos) { + taskSubmitList.add(execService.submit(new Callable<BlockScan>() { + + @Override + public BlockScan call() throws Exception { + BlockScan blockScan = new BlockScan(info, FileFactory.getFileHolder(fileType), + buildQueryStatiticsModel(recorder)); + blockScan.scan(); + return blockScan; + } + })); + } + } + + @Override + public boolean hasNext() { + try { + while ((curBlockScan == null || !curBlockScan.hasNext()) && + nextBlockScanIndex < taskSubmitList.size()) { + curBlockScan = taskSubmitList.get(nextBlockScanIndex++).get(); + } + return curBlockScan != null && curBlockScan.hasNext(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object next() { + throw new UnsupportedOperationException("call processNextBatch instead"); + } + + @Override + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + synchronized (lock) { + if (curBlockScan.hasNext()) { + curBlockScan.processNextBatch(columnarBatch); + } + } + } + + private QueryStatisticsModel buildQueryStatiticsModel(QueryStatisticsRecorder recorder) { + QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel(); + queryStatisticsModel.setRecorder(recorder); + QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet); + queryStatisticsModel.getRecorder().recordStatistics(queryStatisticTotalBlocklet); + + QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet); + queryStatisticsModel.getRecorder().recordStatistics(queryStatisticValidScanBlocklet); + + QueryStatistic totalNumberOfPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, totalNumberOfPages); + queryStatisticsModel.getRecorder().recordStatistics(totalNumberOfPages); + + QueryStatistic validPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.VALID_PAGE_SCANNED, validPages); + queryStatisticsModel.getRecorder().recordStatistics(validPages); + + QueryStatistic scannedPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.PAGE_SCANNED, scannedPages); + queryStatisticsModel.getRecorder().recordStatistics(scannedPages); + + QueryStatistic scanTime = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, scanTime); + queryStatisticsModel.getRecorder().recordStatistics(scanTime); + + QueryStatistic readTime = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime); + queryStatisticsModel.getRecorder().recordStatistics(readTime); + return queryStatisticsModel; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 63af23a..58dc218 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.exception.InvalidConfigurationException; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT; import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION; import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE; @@ -147,6 +148,7 @@ public class SessionParams implements Serializable, Cloneable { case CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE: case CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD: case CARBON_OPTIONS_SINGLE_PASS: + case CARBON_SEARCH_MODE_ENABLE: isValid = CarbonUtil.validateBoolean(value); if (!isValid) { throw new InvalidConfigurationException("Invalid value " + value + " for key " + key); http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala new file mode 100644 index 0000000..0b8f92b --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.detailquery + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * Test Class for detailed query on multiple datatypes + */ + +class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("CREATE TABLE alldatatypestable (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE alldatatypestable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""") + + sql("CREATE TABLE alldatatypestable_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") + sql(s"""LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO TABLE alldatatypestable_hive""") + + } + + test("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true") + checkAnswer( + sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"), + sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'")) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT) + } + + override def afterAll { + sql("drop table alldatatypestable") + sql("drop table alldatatypestable_hive") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index d34d009..256e43d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -206,7 +206,7 @@ class CarbonScanRDD( var statistic = new QueryStatistic() val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val parallelism = sparkContext.defaultParallelism + var parallelism = sparkContext.defaultParallelism val result = new ArrayList[Partition](parallelism) var noOfBlocks = 0 var noOfNodes = 0 @@ -241,7 +241,14 @@ class CarbonScanRDD( CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "false").toBoolean || carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM) - if (useCustomDistribution) { + val enableSearchMode = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).toBoolean + if (useCustomDistribution || enableSearchMode) { + if (enableSearchMode) { + // force to assign only one task contains multiple splits each node + parallelism = 0 + } // create a list of block based on split val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) http://git-wip-us.apache.org/repos/asf/carbondata/blob/638ed1fa/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 903bf44..eb71daa 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; @@ -35,12 +36,14 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.model.ProjectionDimension; import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.AbstractRecordReader; import org.apache.carbondata.hadoop.CarbonInputSplit; @@ -131,7 +134,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { queryModel.setTableBlockInfos(tableBlockInfoList); queryModel.setVectorReader(true); try { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) { + queryExecutor = new SearchModeVectorDetailQueryExecutor(); + } else { + queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + } iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); } catch (QueryExecutionException e) { Throwable ext = e;
