[HOTFIX] Remove search mode module Remove search mode module
This closes #2904 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10918484 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10918484 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10918484 Branch: refs/heads/master Commit: 10918484853953528f5756b1b6bdb4a3a2d3b068 Parents: 6f3b9d3 Author: Jacky Li <jacky.li...@qq.com> Authored: Fri Nov 9 11:45:50 2018 +0800 Committer: xuchuanyin <xuchuan...@hust.edu.cn> Committed: Tue Nov 13 19:59:28 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 65 ---- .../scan/executor/QueryExecutorFactory.java | 17 +- .../impl/SearchModeDetailQueryExecutor.java | 86 ----- .../SearchModeVectorDetailQueryExecutor.java | 91 ------ .../AbstractSearchModeResultIterator.java | 139 -------- .../iterator/SearchModeResultIterator.java | 53 --- .../SearchModeVectorResultIterator.java | 49 --- .../carbondata/core/util/CarbonProperties.java | 57 ---- .../carbondata/core/util/SessionParams.java | 1 - .../benchmark/ConcurrentQueryBenchmark.scala | 59 +--- .../carbondata/examples/S3UsingSDkExample.scala | 3 +- .../carbondata/examples/SearchModeExample.scala | 194 ----------- integration/spark-common-test/pom.xml | 10 - ...eneFineGrainDataMapWithSearchModeSuite.scala | 325 ------------------- .../detailquery/SearchModeTestCase.scala | 154 --------- .../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +- integration/spark2/pom.xml | 5 - .../carbondata/store/SparkCarbonStore.scala | 63 ---- .../org/apache/spark/sql/CarbonSession.scala | 95 ------ .../execution/command/CarbonHiveCommands.scala | 12 +- .../bloom/BloomCoarseGrainDataMapSuite.scala | 15 - pom.xml | 7 - store/search/pom.xml | 112 ------- .../store/worker/SearchRequestHandler.java | 244 -------------- .../apache/carbondata/store/worker/Status.java | 28 -- .../scala/org/apache/spark/rpc/Master.scala | 291 ----------------- .../scala/org/apache/spark/rpc/RpcUtil.scala | 56 ---- .../scala/org/apache/spark/rpc/Scheduler.scala | 139 -------- .../scala/org/apache/spark/rpc/Worker.scala | 118 ------- .../org/apache/spark/search/Registry.scala | 51 --- .../org/apache/spark/search/Searcher.scala | 79 ----- .../carbondata/store/SearchServiceTest.java | 37 --- .../org/apache/spark/rpc/SchedulerSuite.scala | 154 --------- 33 files changed, 25 insertions(+), 2799 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index fc26404..6edfd66 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1380,71 +1380,6 @@ public final class CarbonCommonConstants { public static final String CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT = "1048576"; /** - * If set to true, will use CarbonReader to do distributed scan directly instead of using - * compute framework like spark, thus avoiding limitation of compute framework like SQL - * optimizer and task scheduling overhead. - */ - @InterfaceStability.Unstable - @CarbonProperty(dynamicConfigurable = true) - public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.enabled"; - - public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false"; - - /** - * It's timeout threshold of carbon search query - */ - @InterfaceStability.Unstable - @CarbonProperty - public static final String CARBON_SEARCH_QUERY_TIMEOUT = "carbon.search.query.timeout"; - - /** - * Default value is 10 seconds - */ - public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s"; - - /** - * The size of thread pool used for reading files in Work for search mode. By default, - * it is number of cores in Worker - */ - @InterfaceStability.Unstable - @CarbonProperty - public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread"; - - /** - * In search mode, Master will listen on this port for worker registration. - * If Master failed to start service with this port, it will try to increment the port number - * and try to bind again, until it is success - */ - @InterfaceStability.Unstable - @CarbonProperty - public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port"; - - public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020"; - - /** - * In search mode, Worker will listen on this port for master request like searching. - * If Worker failed to start service with this port, it will try to increment the port number - * and try to bind again, until it is success - */ - @InterfaceStability.Unstable - @CarbonProperty - public static final String CARBON_SEARCH_MODE_WORKER_PORT = "carbon.search.worker.port"; - - public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021"; - - /** - * If number of search request sent to Worker exceed this limit, Master will reschedule - * the request to another worker. In such case, locality will be lost in HDFS scenario, but - * it is fine for S3 scenario. - * - * If user does not set this value, by default it is 10 * number of cores in Worker - */ - @InterfaceStability.Unstable - @CarbonProperty - public static final String CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT = - "carbon.search.worker.workload.limit"; - - /** * When enabled complete row filters will be handled by carbon in case of vector. * If it is disabled then only page level pruning will be done by carbon and row level filtering * will be done by spark for vector. http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java index 2a9c7f4..310e586 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java @@ -17,11 +17,8 @@ package org.apache.carbondata.core.scan.executor; import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.hadoop.conf.Configuration; @@ -32,18 +29,10 @@ import org.apache.hadoop.conf.Configuration; public class QueryExecutorFactory { public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) { - if (CarbonProperties.isSearchModeEnabled()) { - if (queryModel.isVectorReader()) { - return new SearchModeVectorDetailQueryExecutor(configuration); - } else { - return new SearchModeDetailQueryExecutor(configuration); - } + if (queryModel.isVectorReader()) { + return new VectorDetailQueryExecutor(configuration); } else { - if (queryModel.isVectorReader()) { - return new VectorDetailQueryExecutor(configuration); - } else { - return new DetailQueryExecutor(configuration); - } + return new DetailQueryExecutor(configuration); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java deleted file mode 100644 index fe91442..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.executor.impl; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; -import org.apache.carbondata.core.util.CarbonProperties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; - -public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> { - private static final Logger LOGGER = - LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName()); - private static ExecutorService executorService = null; - - public SearchModeDetailQueryExecutor(Configuration configuration) { - super(configuration); - if (executorService == null) { - initThreadPool(); - } - } - - private static synchronized void initThreadPool() { - int defaultValue = Runtime.getRuntime().availableProcessors(); - int nThread; - try { - nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, - String.valueOf(defaultValue))); - } catch (NumberFormatException e) { - nThread = defaultValue; - LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); - } - if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); - } else { - executorService = Executors.newCachedThreadPool(); - } - } - - public static synchronized void shutdownThreadPool() { - if (executorService != null) { - executorService.shutdownNow(); - executorService = null; - } - } - - @Override - public CarbonIterator<Object> execute(QueryModel queryModel) - throws QueryExecutionException, IOException { - List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - - this.queryIterator = new SearchModeResultIterator( - blockExecutionInfoList, - queryModel, - executorService - ); - return this.queryIterator; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java deleted file mode 100644 index dd5f364..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.executor.impl; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.SearchModeVectorResultIterator; -import org.apache.carbondata.core.util.CarbonProperties; - -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; - -/** - * Below class will be used to execute the detail query and returns columnar vectors. - */ -public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { - private static final Logger LOGGER = - LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); - private static ExecutorService executorService = null; - - public SearchModeVectorDetailQueryExecutor(Configuration configuration) { - super(configuration); - if (executorService == null) { - initThreadPool(); - } - } - - private static synchronized void initThreadPool() { - int defaultValue = Runtime.getRuntime().availableProcessors(); - int nThread; - try { - nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, String.valueOf(defaultValue))); - } catch (NumberFormatException e) { - nThread = defaultValue; - LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. " - + "Using the default value " + nThread); - } - if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); - } else { - executorService = Executors.newCachedThreadPool(); - } - } - - public static synchronized void shutdownThreadPool() { - // shutdown all threads immediately - if (executorService != null) { - executorService.shutdownNow(); - executorService = null; - } - } - - @Override - public CarbonIterator<Object> execute(QueryModel queryModel) - throws QueryExecutionException, IOException { - List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - - this.queryIterator = new SearchModeVectorResultIterator( - blockExecutionInfoList, - queryModel, - executorService - ); - return this.queryIterator; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java deleted file mode 100644 index c7f5c51..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.result.iterator; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.processor.BlockScan; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsModel; -import org.apache.carbondata.core.stats.QueryStatisticsRecorder; - -public abstract class AbstractSearchModeResultIterator - extends AbstractDetailQueryResultIterator<Object> { - - private FileFactory.FileType fileType; - private List<Future<BlockScan>> taskSubmitList; - protected BlockScan curBlockScan; - private int nextBlockScanIndex = 0; - - public AbstractSearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, - ExecutorService execService) { - super(infos, queryModel, execService); - this.fileType = FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()); - scanAll(); - } - - private void scanAll() { - taskSubmitList = new ArrayList<>(blockExecutionInfos.size()); - for (final BlockExecutionInfo info: blockExecutionInfos) { - taskSubmitList.add(execService.submit(new Callable<BlockScan>() { - - @Override - public BlockScan call() throws Exception { - BlockScan blockScan = new BlockScan(info, FileFactory.getFileHolder(fileType), - buildQueryStatiticsModel(recorder)); - blockScan.scan(); - return blockScan; - } - })); - } - } - - @Override - public boolean hasNext() { - try { - while ((curBlockScan == null || !curBlockScan.hasNext()) && - nextBlockScanIndex < taskSubmitList.size()) { - curBlockScan = taskSubmitList.get(nextBlockScanIndex++).get(); - } - return curBlockScan != null && curBlockScan.hasNext(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - private QueryStatisticsModel buildQueryStatiticsModel(QueryStatisticsRecorder recorder) { - QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel(); - queryStatisticsModel.setRecorder(recorder); - QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet); - queryStatisticsModel.getRecorder().recordStatistics(queryStatisticTotalBlocklet); - - QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet); - queryStatisticsModel.getRecorder().recordStatistics(queryStatisticValidScanBlocklet); - - QueryStatistic totalNumberOfPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, totalNumberOfPages); - queryStatisticsModel.getRecorder().recordStatistics(totalNumberOfPages); - - QueryStatistic validPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.VALID_PAGE_SCANNED, validPages); - queryStatisticsModel.getRecorder().recordStatistics(validPages); - - QueryStatistic scannedPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.PAGE_SCANNED, scannedPages); - queryStatisticsModel.getRecorder().recordStatistics(scannedPages); - - QueryStatistic scanTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, scanTime); - queryStatisticsModel.getRecorder().recordStatistics(scanTime); - - QueryStatistic readTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime); - queryStatisticsModel.getRecorder().recordStatistics(readTime); - - // dimension filling time - QueryStatistic keyColumnFilingTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.KEY_COLUMN_FILLING_TIME, keyColumnFilingTime); - queryStatisticsModel.getRecorder().recordStatistics(keyColumnFilingTime); - // measure filling time - QueryStatistic measureFilingTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.MEASURE_FILLING_TIME, measureFilingTime); - queryStatisticsModel.getRecorder().recordStatistics(measureFilingTime); - // page Io Time - QueryStatistic pageUncompressTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME, pageUncompressTime); - queryStatisticsModel.getRecorder().recordStatistics(pageUncompressTime); - // result preparation time - QueryStatistic resultPreparationTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.RESULT_PREP_TIME, resultPreparationTime); - queryStatisticsModel.getRecorder().recordStatistics(resultPreparationTime); - return queryStatisticsModel; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java deleted file mode 100644 index c4a65b9..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.result.iterator; - -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.RowBatch; - -public class SearchModeResultIterator extends AbstractSearchModeResultIterator { - - private final Object lock = new Object(); - - public SearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, - ExecutorService execService) { - super(infos, queryModel, execService); - } - - @Override - public RowBatch next() { - return getBatchResult(); - } - - private RowBatch getBatchResult() { - RowBatch rowBatch = new RowBatch(); - synchronized (lock) { - if (curBlockScan.hasNext()) { - List<Object[]> collectedResult = curBlockScan.next(batchSize); - while (collectedResult.size() < batchSize && hasNext()) { - collectedResult.addAll(curBlockScan.next(batchSize - collectedResult.size())); - } - rowBatch.setRows(collectedResult); - } - } - return rowBatch; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java deleted file mode 100644 index bff5e36..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.result.iterator; - -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; - -public class SearchModeVectorResultIterator extends AbstractSearchModeResultIterator { - - private final Object lock = new Object(); - - public SearchModeVectorResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, - ExecutorService execService) { - super(infos, queryModel, execService); - } - - @Override - public Object next() { - throw new UnsupportedOperationException("call processNextBatch instead"); - } - - @Override - public void processNextBatch(CarbonColumnarBatch columnarBatch) { - synchronized (lock) { - if (curBlockScan.hasNext()) { - curBlockScan.processNextBatch(columnarBatch); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index fd42822..d2f812e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -48,8 +48,6 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; -import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK; @@ -188,12 +186,6 @@ public final class CarbonProperties { case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO: validateSchedulerMinRegisteredRatio(); break; - case CARBON_SEARCH_MODE_SCAN_THREAD: - validatePositiveInteger(CARBON_SEARCH_MODE_SCAN_THREAD); - break; - case CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT: - validatePositiveInteger(CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT); - break; case CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE: validateSortMemorySpillPercentage(); break; @@ -1412,55 +1404,6 @@ public final class CarbonProperties { } /** - * Return true if search mode is enabled - */ - public static boolean isSearchModeEnabled() { - String value = getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT); - return Boolean.valueOf(value); - } - - public static void enableSearchMode(boolean enable) { - getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, String.valueOf(enable)); - } - - public static int getSearchMasterPort() { - try { - return Integer.parseInt( - getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT, - CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT)); - } catch (NumberFormatException e) { - return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT); - } - } - - public static int getSearchWorkerPort() { - try { - return Integer.parseInt( - getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT, - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT)); - } catch (NumberFormatException e) { - return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT); - } - } - - public static int getMaxWorkloadForWorker(int workerCores) { - int defaultValue = workerCores * 10; - try { - return Integer.parseInt( - getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, - String.valueOf(defaultValue))); - } catch (NumberFormatException e) { - return defaultValue; - } - } - - /** * Return valid storage level for CARBON_INSERT_STORAGE_LEVEL * @return String */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index f87784e..f49747f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -139,7 +139,6 @@ public class SessionParams implements Serializable, Cloneable { case CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE: case CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD: case CARBON_OPTIONS_SINGLE_PASS: - case CARBON_SEARCH_MODE_ENABLE: case ENABLE_VECTOR_READER: case ENABLE_UNSAFE_IN_QUERY_EXECUTION: case ENABLE_AUTO_LOAD_MERGE: http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala index 697d13f..d8a2176 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala @@ -25,7 +25,7 @@ import java.util.concurrent.{Callable, Executors, Future, TimeUnit} import scala.util.Random -import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} @@ -76,8 +76,6 @@ object ConcurrentQueryBenchmark { var generateFile = true // whether delete file var deleteFile = true - // open search mode, default value is false - var openSearchMode = false // carbon store location var storeLocation = "/tmp" @@ -239,24 +237,22 @@ object ConcurrentQueryBenchmark { } else { null } - if (!openSearchMode) { - val table1Time = time { - if (table1.endsWith("parquet")) { - if (generateFile) { - generateParquetTable(spark, df, storeLocation + "/" + table1) - } - spark.read.parquet(storeLocation + "/" + table1).createOrReplaceTempView(table1) - } else if (table1.endsWith("orc")) { - if (generateFile) { - generateOrcTable(spark, df, table1) - spark.read.orc(table1).createOrReplaceTempView(table1) - } - } else { - sys.error("invalid table: " + table1) + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, storeLocation + "/" + table1) } + spark.read.parquet(storeLocation + "/" + table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) } - println(s"$table1 completed, time: $table1Time sec") } + println(s"$table1 completed, time: $table1Time sec") val table2Time: Double = if (generateFile) { generateCarbonTable(spark, df, table2) @@ -423,26 +419,13 @@ object ConcurrentQueryBenchmark { */ def runTest(spark: SparkSession, table1: String, table2: String): Unit = { // run queries on parquet and carbon - if (!openSearchMode) { - runQueries(spark, table1) - } + runQueries(spark, table1) // do GC and sleep for some time before running next table System.gc() Thread.sleep(1000) System.gc() Thread.sleep(1000) runQueries(spark, table2) - if (openSearchMode) { - runQueries(spark, table2) - // start search mode (start all gRPC server) - // following queries will be run using gRPC - spark.asInstanceOf[CarbonSession].startSearchMode() - println("Open search mode:") - runQueries(spark, table2) - runQueries(spark, table2) - // stop gRPC servers - spark.asInstanceOf[CarbonSession].stopSearchMode() - } } /** @@ -516,16 +499,7 @@ object ConcurrentQueryBenchmark { } } if (arr.length > 8) { - openSearchMode = if (arr(8).equalsIgnoreCase("true")) { - true - } else if (arr(8).equalsIgnoreCase("false")) { - false - } else { - throw new Exception("error parameter, should be true or false") - } - } - if (arr.length > 9) { - storeLocation = arr(9) + storeLocation = arr(8) } } @@ -555,7 +529,6 @@ object ConcurrentQueryBenchmark { "\trunInLocal: " + runInLocal + "\tgenerateFile: " + generateFile + "\tdeleteFile: " + deleteFile + - "\topenSearchMode: " + openSearchMode + "\tstoreLocation: " + storeLocation val spark = if (runInLocal) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala index 666db1f..e0c2cdc 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala @@ -59,8 +59,7 @@ object S3UsingSDKExample { } writer.close() } catch { - case ex: Exception => None - case e => None + case ex: Throwable => None } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala deleted file mode 100644 index aeb4c29..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.File -import java.util.concurrent.{Executors, ExecutorService} - -import org.apache.spark.sql.{CarbonSession, SparkSession} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -/** - * An example that demonstrate how to run queries in search mode, - * and compare the performance between search mode and SparkSQL - */ -// scalastyle:off -object SearchModeExample { - - def main(args: Array[String]) { - import org.apache.spark.sql.CarbonSession._ - val master = Option(System.getProperty("spark.master")) - .orElse(sys.env.get("MASTER")) - .orElse(Option("local[8]")) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") - .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") - .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "") - - val filePath = if (args.length > 0) { - args(0) - } else { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - s"$rootPath/examples/spark2/src/main/resources/data.csv" - } - val storePath = if (args.length > 1) { - args(1) - } else { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - s"$rootPath/examples/spark2/target/store" - } - - val spark = SparkSession - .builder() - .master(master.get) - .appName("SearchModeExample") - .config("spark.sql.crossJoin.enabled", "true") - .getOrCreateCarbonSession(storePath) - - spark.sparkContext.setLogLevel("ERROR") - exampleBody(spark, filePath) - println("Finished!") - spark.close() - } - - def exampleBody(spark: SparkSession, path: String): Unit = { - - spark.sql("DROP TABLE IF EXISTS carbonsession_table") - - // Create table - spark.sql( - s""" - | CREATE TABLE carbonsession_table( - | shortField SHORT, - | intField INT, - | bigintField LONG, - | doubleField DOUBLE, - | stringField STRING, - | timestampField TIMESTAMP, - | decimalField DECIMAL(18,2), - | dateField DATE, - | charField CHAR(5), - | floatField FLOAT - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') - """.stripMargin) - - spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE carbonsession_table - | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#') - """.stripMargin) - - val pool = Executors.newCachedThreadPool() - - // start search mode - spark.asInstanceOf[CarbonSession].startSearchMode() - runAsynchrousSQL(spark, pool, 1) - - println("search mode asynchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runAsynchrousSQL(spark, pool, 100) - } - - println("search mode synchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runSynchrousSQL(spark, 100) - } - - // stop search mode - spark.asInstanceOf[CarbonSession].stopSearchMode() - - println("sparksql asynchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runAsynchrousSQL(spark, pool, 100) - } - - println("sparksql synchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runSynchrousSQL(spark, 100) - } - - // start search mode again - spark.asInstanceOf[CarbonSession].startSearchMode() - - println("search mode asynchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runAsynchrousSQL(spark, pool, 100) - } - - println("search mode synchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runSynchrousSQL(spark, 100) - } - - // stop search mode - spark.asInstanceOf[CarbonSession].stopSearchMode() - - println("sparksql asynchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runAsynchrousSQL(spark, pool, 100) - } - - println("sparksql synchronous query") - org.apache.spark.sql.catalyst.util.benchmark { - runSynchrousSQL(spark, 100) - } - - spark.sql("DROP TABLE IF EXISTS carbonsession_table") - pool.shutdownNow() - } - - private def runAsynchrousSQL(spark: SparkSession, pool: ExecutorService, round: Int): Unit = { - val futures = (1 to round).map { i => - pool.submit(new Runnable { - override def run(): Unit = { - spark.sql( - s""" - SELECT charField, stringField, intField, dateField - FROM carbonsession_table - WHERE stringfield = 'spark' AND decimalField > $i % 37 - """.stripMargin - ).collect() - } - }) - } - - futures.foreach(_.get()) - } - - private def runSynchrousSQL(spark: SparkSession, round: Int): Unit = { - (1 to round).map { i => - spark.sql( - s""" - SELECT charField, stringField, intField, dateField - FROM carbonsession_table - WHERE stringfield = 'spark' AND decimalField > $i % 37 - """.stripMargin - ).collect() - } - } -} -// scalastyle:on \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index ef3ffb1..5da0c67 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -44,7 +44,6 @@ <!--<build.directory.projectHive>../../integration/hive/target</build.directory.projectHive>--> <!--<build.directory.projectPresto>../../integration/presto/target</build.directory.projectPresto>--> <build.directory.projectStoreSdk>../../store/sdk/target</build.directory.projectStoreSdk> - <build.directory.projectStoreSearch>../../store/search/target</build.directory.projectStoreSearch> <build.directory.projectStreaming>../../streaming/target</build.directory.projectStreaming> <build.directory.projectBloom>../../datamap/bloom/target</build.directory.projectBloom> <build.directory.projectLucene>../../datamap/lucene/target</build.directory.projectLucene> @@ -61,7 +60,6 @@ <!--<classes.directory.projectHive>../../integration/hive/target/classes</classes.directory.projectHive>--> <!--<classes.directory.projectPresto>../../integration/presto/target/classes</classes.directory.projectPresto>--> <classes.directory.projectStoreSdk>../../store/sdk/target/classes</classes.directory.projectStoreSdk> - <classes.directory.projectStoreSearch>../../store/search/target/classes</classes.directory.projectStoreSearch> <classes.directory.projectStreaming>../../streaming/target/classes</classes.directory.projectStreaming> <classes.directory.projectBloom>../../datamap/bloom/target/classes</classes.directory.projectBloom> <classes.directory.projectLucene>../../datamap/lucene/target/classes</classes.directory.projectLucene> @@ -82,8 +80,6 @@ <!--<sources.directory.projectPresto>../../integration/presto/src/main/java</sources.directory.projectPresto>--> <!--<sources.directory.projectPresto>../../integration/presto/src/main/scala</sources.directory.projectPresto>--> <sources.directory.projectStoreSdk>../../store/sdk/src/main/java</sources.directory.projectStoreSdk> - <sources.directory.projectStoreSearch>../../store/search/src/main/java</sources.directory.projectStoreSearch> - <sources.directory.projectStoreSearch>../../store/search/src/main/scala</sources.directory.projectStoreSearch> <sources.directory.projectStreaming>../../streaming/src/main/java</sources.directory.projectStreaming> <sources.directory.projectStreaming>../../streaming/src/main/scala</sources.directory.projectStreaming> <sources.directory.projectBloom>../../datamap/bloom/src/main/java</sources.directory.projectBloom> @@ -101,7 +97,6 @@ <!--<generated-sources.directory.projectHive>../../integration/hive/target/generated-sources/annotations</generated-sources.directory.projectHive>--> <!--<generated-sources.directory.projectPresto>../../integration/presto/target/generated-sources/annotations</generated-sources.directory.projectPresto>--> <generated-sources.directory.projectStoreSdk>../../store/sdk/target/generated-sources/annotations</generated-sources.directory.projectStoreSdk> - <generated-sources.directory.projectStoreSearch>../../store/search/target/generated-sources/annotations</generated-sources.directory.projectStoreSearch> <generated-sources.directory.projectStreaming>../../streaming/target/generated-sources/annotations</generated-sources.directory.projectStreaming> <generated-sources.directory.projectBloom>../../datamap/bloom/target/generated-sources/annotations</generated-sources.directory.projectBloom> <generated-sources.directory.projectLucene>../../datamap/lucene/target/generated-sources/annotations</generated-sources.directory.projectLucene> @@ -340,9 +335,6 @@ <fileset dir="${build.directory.projectStoreSdk}" erroronmissingdir="false"> <include name="jacoco.exec" /> </fileset> - <fileset dir="${build.directory.projectStoreSearch}" erroronmissingdir="false"> - <include name="jacoco.exec" /> - </fileset> <fileset dir="${build.directory.projectStreaming}" erroronmissingdir="false"> <include name="jacoco.exec" /> </fileset> @@ -369,7 +361,6 @@ <!--<fileset dir="${classes.directory.projectHive}" erroronmissingdir="false" />--> <!--<fileset dir="${classes.directory.projectPresto}" erroronmissingdir="false" />--> <fileset dir="${classes.directory.projectStoreSdk}" erroronmissingdir="false" /> - <fileset dir="${classes.directory.projectStoreSearch}" erroronmissingdir="false" /> <fileset dir="${classes.directory.projectStreaming}" erroronmissingdir="false" /> <fileset dir="${classes.directory.projectBloom}" erroronmissingdir="false" /> <fileset dir="${classes.directory.projectLucene}" erroronmissingdir="false" /> @@ -386,7 +377,6 @@ <!--<fileset dir="${sources.directory.projectHive}" erroronmissingdir="false" />--> <!--<fileset dir="${sources.directory.projectPresto}" erroronmissingdir="false" />--> <fileset dir="${sources.directory.projectStoreSdk}" erroronmissingdir="false" /> - <fileset dir="${sources.directory.projectStoreSearch}" erroronmissingdir="false" /> <fileset dir="${sources.directory.projectStreaming}" erroronmissingdir="false" /> <fileset dir="${sources.directory.projectBloom}" erroronmissingdir="false" /> <fileset dir="${sources.directory.projectLucene}" erroronmissingdir="false" /> http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala deleted file mode 100644 index 375495c..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.lucene - -import java.io.{File, PrintWriter} - -import scala.util.Random - -import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row} -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.datamap.status.DataMapStatusManager - -/** - * Test lucene fine grain datamap with search mode - */ -class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/datamap_input.csv" - - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 500000 - sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() - CarbonProperties - .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") - LuceneFineGrainDataMapSuite.createFile(file2, n) - sql("create database if not exists lucene") - sql("use lucene") - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - } - - test("test lucene fine grain data map with search mode") { - - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='Name') - """.stripMargin) - - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), - sql(s"select * from datamap_test where name='n10'")) - - sql("drop datamap dm on table datamap_test") - } - - // TODOï¼ optimize performance - ignore("test lucene fine grain data map with TEXT_MATCH 'AND' Filter") { - sql("drop datamap if exists dm on table datamap_test") - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='name') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer( - sql("SELECT count(*) FROM datamap_test WHERE TEXT_MATCH('name:n2*') " + - "AND age=28 and id=200149"), - sql("SELECT count(*) FROM datamap_test WHERE name like 'n2%' " + - "AND age=28 and id=200149")) - sql("drop datamap if exists dm on table datamap_test") - } - - // TODOï¼ optimize performance - ignore("test lucene fine grain data map with TEXT_MATCH 'AND' and 'OR' Filter ") { - sql("drop datamap if exists dm on table datamap_test") - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='name , city') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n1*') OR TEXT_MATCH ('city:c01*') " + - "AND TEXT_MATCH('city:C02*')"), - sql("select * from datamap_test where name like 'n1%' OR city like 'c01%' and city like" + - " 'c02%'")) - sql("drop datamap if exists dm on table datamap_test") - } - - test("test lucene fine grain data map with compaction-Major ") { - sql("DROP TABLE IF EXISTS datamap_test_table") - sql( - """ - | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test_table - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='name , city') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") - checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"), - sql("select * from datamap_test_table where name='n10'")) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") - sql("alter table datamap_test_table compact 'major'") - checkAnswer(sql("SELECT COUNT(*) FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"), - sql("select COUNT(*) from datamap_test_table where name='n10'")) - sql("drop datamap if exists dm on table datamap_test_table") - sql("DROP TABLE IF EXISTS datamap_test_table") - } - - ignore("test lucene fine grain datamap rebuild") { - sql("DROP TABLE IF EXISTS datamap_test5") - sql( - """ - | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test5 - | USING 'lucene' - | WITH DEFERRED REBUILD - | DMProperties('INDEX_COLUMNS'='city') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - val map = DataMapStatusManager.readDataMapStatusMap() - assert(!map.get("dm").isEnabled) - sql("REBUILD DATAMAP dm ON TABLE datamap_test5") - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) - sql("DROP TABLE IF EXISTS datamap_test5") - } - - test("test lucene fine grain datamap rebuild with table block size") { - sql("DROP TABLE IF EXISTS datamap_test5") - sql( - """ - | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test5 - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='Name , cIty') - """.stripMargin) - - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c00')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c00'")) - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c0100085')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c0100085'")) - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c09560')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c09560'")) - sql("DROP TABLE IF EXISTS datamap_test5") - } - - test("test lucene fine grain multiple data map on table") { - sql("DROP TABLE IF EXISTS datamap_test5") - sql( - """ - | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm2 ON TABLE datamap_test5 - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='city') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test5 - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='Name') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('name:n10')"), - sql(s"select * from datamap_test5 where name='n10'")) - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) - sql("DROP TABLE IF EXISTS datamap_test5") - } - - // TODOï¼ support it in the future - ignore("test lucene datamap and validate the visible and invisible status of datamap ") { - val tableName = "datamap_test2" - val dataMapName1 = "ggdatamap1"; - sql(s"DROP TABLE IF EXISTS $tableName") - sql( - s""" - | CREATE TABLE $tableName(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - // register datamap writer - sql( - s""" - | CREATE DATAMAP ggdatamap1 ON TABLE $tableName - | USING 'lucene' - | DMPROPERTIES('index_columns'='name') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE $tableName OPTIONS('header'='false')") - - val df1 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").collect() - sql(s"SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").show() - println(df1(0).getString(0)) - assertResult( - s"""== CarbonData Profiler == - |Table Scan on datamap_test2 - | - total blocklets: 1 - | - filter: TEXT_MATCH('name:n502670') - | - pruned by Main DataMap - | - skipped blocklets: 0 - | - pruned by FG DataMap - | - name: ggdatamap1 - | - provider: lucene - | - skipped blocklets: 1 - |""".stripMargin)(df1(0).getString(0)) - - sql(s"set ${CarbonCommonConstants.CARBON_DATAMAP_VISIBLE}default.$tableName.$dataMapName1 = false") - - val df2 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670'").collect() - println(df2(0).getString(0)) - assertResult( - s"""== CarbonData Profiler == - |Table Scan on $tableName - | - total blocklets: 1 - | - filter: (name <> null and name = n502670) - | - pruned by Main DataMap - | - skipped blocklets: 0 - |""".stripMargin)(df2(0).getString(0)) - - checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"), - sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'")) - sql(s"DROP TABLE IF EXISTS $tableName") - } - - ignore("test lucene fine grain datamap rebuild with table block size, rebuild") { - sql("DROP TABLE IF EXISTS datamap_test5") - sql( - """ - | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test5 - | USING 'lucene' - | WITH DEFERRED REBUILD - | DMProperties('INDEX_COLUMNS'='Name , cIty') - """.stripMargin) - - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") - sql("REBUILD DATAMAP dm ON TABLE datamap_test5") - - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show() - sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() - sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show() - checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), - sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) - sql("DROP TABLE IF EXISTS datamap_test5") - } - - override protected def afterAll(): Unit = { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, - CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT) - LuceneFineGrainDataMapSuite.deleteFile(file2) - sql("DROP TABLE IF EXISTS datamap_test") - sql("DROP TABLE IF EXISTS datamap_test5") - sql("use default") - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - } - - def createFile(fileName: String, line: Int = 10000, start: Int = 0) = { - val write = new PrintWriter(new File(fileName)) - for (i <- start until (start + line)) { - write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80)) - } - write.close() - } - - def deleteFile(fileName: String): Unit = { - val file = new File(fileName) - if (file.exists()) { - file.delete() - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala deleted file mode 100644 index 19c0d31..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.detailquery - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{CarbonSession, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.util.DataGenerator - -/** - * Test Suite for search mode - */ - -class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { - - val numRows = 500 * 1000 - override def beforeAll = { - sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() - sql("DROP TABLE IF EXISTS main") - - val df = DataGenerator.generateDataFrame(sqlContext.sparkSession, numRows) - df.write - .format("carbondata") - .option("tableName", "main") - .option("table_blocksize", "5") - .mode(SaveMode.Overwrite) - .save() - } - - override def afterAll = { - sql("DROP TABLE IF EXISTS main") - sql("set carbon.search.enabled = false") - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - } - - private def sparkSql(sql: String): Seq[Row] = { - sqlContext.sparkSession.asInstanceOf[CarbonSession].sparkSql(sql).collect() - } - - private def checkSearchAnswer(query: String) = { - checkAnswer(sql(query), sparkSql(query)) - } - - test("SearchMode Query: row result") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") - checkSearchAnswer("select * from main where city = 'city3'") - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, - CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) - } - - test("SearchMode Query: vector result") { - checkSearchAnswer("select * from main where city = 'city3'") - } - - test("equal filter") { - checkSearchAnswer("select id from main where id = '100'") - checkSearchAnswer("select id from main where planet = 'planet100'") - } - - test("greater and less than filter") { - checkSearchAnswer("select id from main where m2 < 4") - } - - test("IN filter") { - checkSearchAnswer("select id from main where id IN ('40', '50', '60')") - } - - test("expression filter") { - checkSearchAnswer("select id from main where length(id) < 2") - } - - test("filter with limit") { - checkSearchAnswer("select id from main where id = '3' limit 10") - checkSearchAnswer("select id from main where length(id) < 2 limit 10") - } - - test("aggregate query") { - checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city") - } - - test("aggregate query with datamap and fallback to SparkSQL") { - sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ") - checkSearchAnswer("select city, count(*) from main group by city") - sql("drop datamap preagg on table main").show() - } - - test("set search mode") { - sql("set carbon.search.enabled = true") - assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) - checkSearchAnswer("select id from main where id = '3' limit 10") - sql("set carbon.search.enabled = false") - assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) - } - - test("test lucene datamap with search mode") { - sql("set carbon.search.enabled = true") - sql("DROP DATAMAP IF EXISTS dm ON TABLE main") - sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ") - checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), - sql(s"SELECT * FROM main WHERE id='100000'")) - sql("DROP DATAMAP if exists dm ON TABLE main") - sql("set carbon.search.enabled = false") - } - - test("test lucene datamap with search mode 2") { - sql("drop datamap if exists dm3 ON TABLE main") - sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ") - checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), - sql("SELECT * FROM main WHERE city='city6'")) - sql("DROP DATAMAP if exists dm3 ON TABLE main") - } - - test("test lucene datamap with search mode, two column") { - sql("drop datamap if exists dm3 ON TABLE main") - sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ") - checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), - sql("SELECT * FROM main WHERE city='city6'")) - checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), - sql(s"SELECT * FROM main WHERE id='100000'")) - sql("DROP DATAMAP if exists dm3 ON TABLE main") - } - - test("start search mode twice") { - sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() - assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) - checkSearchAnswer("select id from main where id = '3' limit 10") - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) - - // start twice - sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() - assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) - checkSearchAnswer("select id from main where id = '3' limit 10") - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 542a454..28049b5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -37,9 +37,9 @@ import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd} +import org.apache.spark.sql.profiler.{GetPartition, Profiler} import org.apache.spark.sql.util.SparkSQLUtil.sessionState -import org.apache.spark.util.{CarbonReflectionUtils, TaskCompletionListener} +import org.apache.spark.util.TaskCompletionListener import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl @@ -52,7 +52,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} +import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.statusmanager.FileFormat import org.apache.carbondata.core.util._ import org.apache.carbondata.hadoop._ @@ -258,14 +258,7 @@ class CarbonScanRDD[T: ClassTag]( CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "false").toBoolean || carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM) - val enableSearchMode = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).toBoolean - if (useCustomDistribution || enableSearchMode) { - if (enableSearchMode) { - // force to assign only one task contains multiple splits each node - parallelism = 0 - } + if (useCustomDistribution) { // create a list of block based on split val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index f874906..69594eb 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -52,11 +52,6 @@ </dependency> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-search</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-lucene</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala index e33b8e2..e626e63 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -18,22 +18,17 @@ package org.apache.carbondata.store import java.io.IOException -import java.net.InetAddress import scala.collection.JavaConverters._ import org.apache.spark.{CarbonInputMetrics, SparkConf} -import org.apache.spark.rpc.{Master, Worker} import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.CarbonSession._ import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.spark.rdd.CarbonScanRDD @@ -44,8 +39,6 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD @InterfaceAudience.Internal class SparkCarbonStore extends MetaCachedCarbonStore { private var session: SparkSession = _ - private var master: Master = _ - private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) /** * Initialize SparkCarbonStore @@ -111,60 +104,4 @@ class SparkCarbonStore extends MetaCachedCarbonStore { .asJava } - def startSearchMode(): Unit = { - LOG.info("Starting search mode master") - master = new Master(session.sparkContext.getConf) - master.startService() - startAllWorkers() - } - - def stopSearchMode(): Unit = { - LOG.info("Shutting down all workers...") - try { - master.stopAllWorkers() - LOG.info("All workers are shutted down") - } catch { - case e: Exception => - LOG.error(s"failed to shutdown worker: ${e.toString}") - } - LOG.info("Stopping master...") - master.stopService() - LOG.info("Master stopped") - master = null - } - - /** search mode */ - def search( - table: CarbonTable, - projectColumns: Array[String], - filter: Expression, - globalLimit: Long, - localLimit: Long): java.util.Iterator[CarbonRow] = { - if (master == null) { - throw new IllegalStateException("search mode is not started") - } - master.search(table, projectColumns, filter, globalLimit, localLimit) - .iterator - .asJava - } - - private def startAllWorkers(): Array[Int] = { - // TODO: how to ensure task is sent to every executor? - val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size - val masterIp = InetAddress.getLocalHost.getHostAddress - val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors) - .mapPartitions { f => - // start worker - Worker.init(masterIp, CarbonProperties.getSearchMasterPort) - new Iterator[Int] { - override def hasNext: Boolean = false - - override def next(): Int = 1 - } - }.collect() - LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} " + - s"workers are started successfully") - rows - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 7eb6e88..8e40bf7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -23,24 +23,18 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} -import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.profiler.{Profiler, SQLStart} import org.apache.spark.util.{CarbonReflectionUtils, Utils} import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} -import org.apache.carbondata.store.SparkCarbonStore import org.apache.carbondata.streaming.CarbonStreamingQueryListener /** @@ -93,18 +87,7 @@ class CarbonSession(@transient val sc: SparkContext, withProfiler( sqlText, (qe, sse) => { - if (isSearchModeEnabled) { - try { - trySearchMode(qe, sse) - } catch { - case e: Exception => - log.error(String.format( - "Exception when executing search mode: %s", e.getMessage)) - throw e; - } - } else { new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema)) - } } ) } @@ -120,8 +103,6 @@ class CarbonSession(@transient val sc: SparkContext, message(0).getString(0).contains(dataMapName) } - def isSearchModeEnabled: Boolean = carbonStore != null - /** * Run SparkSQL directly */ @@ -163,82 +144,6 @@ class CarbonSession(@transient val sc: SparkContext, } } } - - /** - * If the query is a simple query with filter, we will try to use Search Mode, - * otherwise execute in SparkSQL - */ - private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = { - val analyzed = qe.analyzed - val LOG = LogServiceFactory.getLogService(this.getClass.getName) - analyzed match { - case _@Project(columns, _@Filter(expr, s: SubqueryAlias)) - if s.child.isInstanceOf[LogicalRelation] && - s.child.asInstanceOf[LogicalRelation].relation - .isInstanceOf[CarbonDatasourceHadoopRelation] => - LOG.info(s"Search service started and supports filter: ${sse.sqlText}") - runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation]) - case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias)))) - if s.child.isInstanceOf[LogicalRelation] && - s.child.asInstanceOf[LogicalRelation].relation - .isInstanceOf[CarbonDatasourceHadoopRelation] => - val logicalRelation = s.child.asInstanceOf[LogicalRelation] - LOG.info(s"Search service started and supports limit: ${sse.sqlText}") - runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows) - case _ => - LOG.info(s"Search service started, but don't support: ${sse.sqlText}," + - s" and will run it with SparkSQL") - new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema)) - } - } - - @transient private var carbonStore: SparkCarbonStore = _ - - def startSearchMode(): Unit = { - CarbonProperties.enableSearchMode(true) - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") - if (carbonStore == null) { - carbonStore = new SparkCarbonStore(this) - carbonStore.startSearchMode() - } - } - - def stopSearchMode(): Unit = { - CarbonProperties.enableSearchMode(false) - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") - if (carbonStore != null) { - try { - carbonStore.stopSearchMode() - carbonStore = null - } catch { - case e: RuntimeException => - LogServiceFactory.getLogService(this.getClass.getCanonicalName) - .error(s"Stop search mode failed: ${e.getMessage}") - } - } - } - - private def runSearch( - logicalPlan: LogicalPlan, - columns: Seq[NamedExpression], - expr: Expression, - relation: LogicalRelation, - maxRows: Option[Long] = None, - localMaxRows: Option[Long] = None): DataFrame = { - val rows = carbonStore.search( - relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable, - columns.map(_.name).toArray, - if (expr != null) CarbonFilters.transformExpression(expr) else null, - maxRows.getOrElse(Long.MaxValue), - localMaxRows.getOrElse(Long.MaxValue)) - val output = new java.util.ArrayList[Row]() - while (rows.hasNext) { - val row = rows.next() - output.add(Row.fromSeq(row.getData)) - } - createDataFrame(output, logicalPlan.schema) - } - } object CarbonSession { http://git-wip-us.apache.org/repos/asf/carbondata/blob/10918484/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index e26163f..9f97828 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution.command -import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -72,17 +72,7 @@ case class CarbonSetCommand(command: SetCommand) command.kv match { case Some((key, Some(value))) => CarbonSetCommand.validateAndSetValue(sessionParams, key, value) - - // handle search mode start/stop for ThriftServer usage - if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE)) { - if (value.equalsIgnoreCase("true")) { - sparkSession.asInstanceOf[CarbonSession].startSearchMode() - } else { - sparkSession.asInstanceOf[CarbonSession].stopSearchMode() - } - } case _ => - } command.run(sparkSession) }