Repository: carbondata Updated Branches: refs/heads/master 242c08be4 -> 2f85381f8
[CARBONDATA-2377][CarbonSearch] Support message throttling in search mode This closes #2205 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f85381f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f85381f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f85381f Branch: refs/heads/master Commit: 2f85381f8bf945fde219c0d24e9649bc015c2ecc Parents: 242c08b Author: Jacky Li <[email protected]> Authored: Sun Apr 22 12:35:10 2018 +0800 Committer: QiangCai <[email protected]> Committed: Fri Apr 27 18:19:13 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 21 ++- .../impl/SearchModeDetailQueryExecutor.java | 12 +- .../SearchModeVectorDetailQueryExecutor.java | 7 +- .../carbondata/core/scan/model/QueryModel.java | 8 + .../carbondata/core/util/CarbonProperties.java | 51 ++++++ .../detailquery/SearchModeTestCase.scala | 1 + .../carbondata/store/SparkCarbonStore.scala | 34 ++-- .../org/apache/spark/sql/CarbonSession.scala | 2 + store/search/pom.xml | 34 ++++ .../store/worker/SearchRequestHandler.java | 24 ++- .../scala/org/apache/spark/rpc/Master.scala | 121 +++++++-------- .../scala/org/apache/spark/rpc/Scheduler.scala | 139 +++++++++++++++++ .../scala/org/apache/spark/rpc/Worker.scala | 2 +- .../org/apache/spark/search/Searcher.scala | 2 +- .../org/apache/spark/rpc/SchedulerSuite.scala | 154 +++++++++++++++++++ 15 files changed, 523 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 54db6e8..c4b0507 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1649,20 +1649,18 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false"; /** - * Thread size of static ExecutorService in each Node when using search mode. - * Default value is -1, it means that Executors.newCachedThreadPool() will be used to - * maximize utilization. If thread numbers has to be limited, set it a positive Integer - * will call Executors.newFixedThreadPool(int nThreads) instead + * The size of thread pool used for reading files in Work for search mode. By default, + * it is number of cores in Worker */ @CarbonProperty @InterfaceStability.Unstable public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread"; - public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1"; - /** * In search mode, Master will listen on this port for worker registration */ + @CarbonProperty + @InterfaceStability.Unstable public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port"; public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020"; @@ -1678,6 +1676,17 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021"; + /** + * If number of search request sent to Worker exceed this limit, Master will reschedule + * the request to another worker. In such case, locality will be lost in HDFS scenario, but + * it is fine for S3 scenario. + * + * If user does not set this value, by default it is 10 * number of cores in Worker + */ + @CarbonProperty + @InterfaceStability.Unstable + public static final String CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT = + "carbon.search.worker.workload.limit"; /* * whether to enable prefetch for rowbatch to enhance row reconstruction during compaction http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java index 484cafd..04669ab 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java @@ -42,13 +42,14 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> } private static synchronized void initThreadPool() { + int defaultValue = Runtime.getRuntime().availableProcessors(); int nThread; try { nThread = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, - CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); + String.valueOf(defaultValue))); } catch (NumberFormatException e) { - nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); + nThread = defaultValue; LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); } if (nThread > 0) { @@ -58,6 +59,13 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> } } + public static synchronized void shutdownThreadPool() { + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } + } + @Override public CarbonIterator<Object> execute(QueryModel queryModel) throws QueryExecutionException, IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java index 02e8dc1..6c9396b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java @@ -24,7 +24,6 @@ import java.util.concurrent.Executors; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryModel; @@ -46,13 +45,13 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O } private static synchronized void initThreadPool() { + int defaultValue = Runtime.getRuntime().availableProcessors(); int nThread; try { nThread = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, - CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); + .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD, String.valueOf(defaultValue))); } catch (NumberFormatException e) { - nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); + nThread = defaultValue; LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. " + "Using the default value " + nThread); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index 409bc2a..de11d11 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -369,4 +369,12 @@ public class QueryModel { public void setRequiredRowId(boolean requiredRowId) { this.requiredRowId = requiredRowId; } + + @Override + public String toString() { + return String.format("scan on table %s.%s, %d projection columns with filter (%s)", + table.getDatabaseName(), table.getTableName(), + projection.getDimensions().size() + projection.getMeasures().size(), + filterExpressionResolverTree.getFilterExpression().toString()); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 82080dc..391096d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -45,6 +45,8 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK; @@ -185,11 +187,39 @@ public final class CarbonProperties { case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO: validateSchedulerMinRegisteredRatio(); break; + case CARBON_SEARCH_MODE_SCAN_THREAD: + validatePositiveInteger(CARBON_SEARCH_MODE_SCAN_THREAD); + break; + case CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT: + validatePositiveInteger(CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT); + break; // TODO : Validation for carbon.lock.type should be handled for addProperty flow default: // none } } + + /** + * Validate the specified property is positive integer value + */ + private void validatePositiveInteger(String propertyName) { + String value = getInstance().getProperty(propertyName); + try { + int intValue = Integer.parseInt(value); + if (intValue <= 0) { + getInstance().removeProperty(propertyName); + LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " + + "is invalid. Ignoring it", value, propertyName)); + throw new IllegalArgumentException(); + } + } catch (NumberFormatException e) { + getInstance().removeProperty(propertyName); + LOGGER.warn(String.format("The value \"%s\" configured for key \"%s\" " + + "is invalid. Ignoring it", value, propertyName)); + throw e; + } + } + /** * This method validates the loaded properties and loads default * values in case of wrong values. @@ -825,6 +855,15 @@ public final class CarbonProperties { return this; } + /** + * Remove the specified key in property + */ + public CarbonProperties removeProperty(String key) { + carbonProperties.remove(key); + addedProperty.remove(key); + return this; + } + private ColumnarFormatVersion getDefaultFormatVersion() { return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION); } @@ -1501,4 +1540,16 @@ public final class CarbonProperties { return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT); } } + + public static int getMaxWorkloadForWorker(int workerCores) { + int defaultValue = workerCores * 10; + try { + return Integer.parseInt( + getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, + String.valueOf(defaultValue))); + } catch (NumberFormatException e) { + return defaultValue; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index 0e0628e..6921c82 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -32,6 +32,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { val numRows = 500 * 1000 override def beforeAll = { + sqlContext.sparkContext.setLogLevel("INFO") sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() sql("DROP TABLE IF EXISTS main") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala index 279e7b0..c0d0d09 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression @@ -43,6 +44,7 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD class SparkCarbonStore extends MetaCachedCarbonStore { private var session: SparkSession = _ private var master: Master = _ + private final val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) /** * Initialize SparkCarbonStore @@ -114,8 +116,17 @@ class SparkCarbonStore extends MetaCachedCarbonStore { } def stopSearchMode(): Unit = { - master.stopAllWorkers() + LOG.info("Shutting down all workers...") + try { + master.stopAllWorkers() + LOG.info("All workers are shutted down") + } catch { + case e: Exception => + LOG.error(s"failed to shutdown worker: ${e.toString}") + } + LOG.info("Stopping master...") master.stopService() + LOG.info("Master stopped") master = null } @@ -138,14 +149,19 @@ class SparkCarbonStore extends MetaCachedCarbonStore { // TODO: how to ensure task is sent to every executor? val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size val masterIp = InetAddress.getLocalHost.getHostAddress - session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors).mapPartitions { f => - // start worker - Worker.init(masterIp, CarbonProperties.getSearchMasterPort) - new Iterator[Int] { - override def hasNext: Boolean = false - override def next(): Int = 1 - } - }.collect() + val rows = session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors) + .mapPartitions { f => + // start worker + Worker.init(masterIp, CarbonProperties.getSearchMasterPort) + new Iterator[Int] { + override def hasNext: Boolean = false + + override def next(): Int = 1 + } + }.collect() + LOG.info(s"Tried to start $numExecutors workers, ${master.getWorkers.size} " + + s"workers are started successfully") + rows } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 7da231a..81daece 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -181,6 +181,7 @@ class CarbonSession(@transient val sc: SparkContext, def startSearchMode(): Unit = { CarbonProperties.enableSearchMode(true) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") if (carbonStore == null) { carbonStore = new SparkCarbonStore(this) carbonStore.startSearchMode() @@ -189,6 +190,7 @@ class CarbonSession(@transient val sc: SparkContext, def stopSearchMode(): Unit = { CarbonProperties.enableSearchMode(false) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") if (carbonStore != null) { try { carbonStore.stopSearchMode() http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/pom.xml ---------------------------------------------------------------------- diff --git a/store/search/pom.xml b/store/search/pom.xml index 00184ca..9d833f2 100644 --- a/store/search/pom.xml +++ b/store/search/pom.xml @@ -34,9 +34,15 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -73,6 +79,34 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>CarbonTestSuite.txt</filereports> + <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index 7708d8b..8e31395 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -35,6 +35,8 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope; +import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; @@ -61,15 +63,22 @@ public class SearchRequestHandler { public SearchResult handleSearch(SearchRequest request) { try { + LOG.info(String.format("[SearchId:%d] receive search request", request.searchId())); List<CarbonRow> rows = handleRequest(request); + LOG.info(String.format("[SearchId:%d] sending success response", request.searchId())); return createSuccessResponse(request, rows); } catch (IOException | InterruptedException e) { LOG.error(e); + LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId())); return createFailureResponse(request, e); } } public ShutdownResponse handleShutdown(ShutdownRequest request) { + LOG.info("Shutting down worker..."); + SearchModeDetailQueryExecutor.shutdownThreadPool(); + SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); + LOG.info("Worker shutted down"); return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); } @@ -92,8 +101,11 @@ public class SearchRequestHandler { long limit = request.limit(); long rowCount = 0; + LOG.info(String.format("[SearchId:%d] %s, number of block: %d", + request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size())); + // If there is FGDataMap, prune the split by applying FGDataMap - queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit); + queryModel = tryPruneByFGDataMap(request.searchId(), table, queryModel, mbSplit); // In search mode, reader will read multiple blocks by using a thread pool CarbonRecordReader<CarbonRow> reader = @@ -114,6 +126,8 @@ public class SearchRequestHandler { } finally { reader.close(); } + LOG.info(String.format("[SearchId:%d] scan completed, return %d rows", + request.searchId(), rows.size())); return rows; } @@ -121,7 +135,7 @@ public class SearchRequestHandler { * If there is FGDataMap defined for this table and filter condition in the query, * prune the splits by the DataMap and set the pruned split into the QueryModel and return */ - private QueryModel tryPruneByFGDataMap( + private QueryModel tryPruneByFGDataMap(int queryId, CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException { DataMapExprWrapper wrapper = DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree()); @@ -146,6 +160,8 @@ public class SearchRequestHandler { blockToRead.add(block); } } + LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", + queryId, blockToRead.size())); queryModel.setTableBlockInfos(blockToRead); } return queryModel; @@ -167,7 +183,7 @@ public class SearchRequestHandler { * create a failure response */ private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) { - return new SearchResult(request.queryId(), Status.FAILURE.ordinal(), throwable.getMessage(), + return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(), new Object[0][]); } @@ -181,7 +197,7 @@ public class SearchRequestHandler { while (itor.hasNext()) { output[i++] = itor.next().getData(); } - return new SearchResult(request.queryId(), Status.SUCCESS.ordinal(), "", output); + return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index df08ac4..bc44fb6 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -19,10 +19,9 @@ package org.apache.spark.rpc import java.io.IOException import java.net.InetAddress -import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID} +import java.util.{List => JList, Map => JMap, Objects, Random, UUID} import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration.Duration @@ -58,12 +57,13 @@ class Master(sparkConf: SparkConf, port: Int) { private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // worker host address map to EndpointRef - private val workers = mutable.Map[String, RpcEndpointRef]() private val random = new Random private var rpcEnv: RpcEnv = _ + private val scheduler: Scheduler = new Scheduler + def this(sparkConf: SparkConf) = { this(sparkConf, CarbonProperties.getSearchMasterPort) } @@ -94,15 +94,15 @@ class Master(sparkConf: SparkConf, port: Int) { } def stopAllWorkers(): Unit = { - val futures = workers.mapValues { ref => - ref.ask[ShutdownResponse](ShutdownRequest("user")) + val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) => + (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user"))) } - futures.foreach { case (hostname, future) => + futures.foreach { case (address, future) => ThreadUtils.awaitResult(future, Duration.apply("10s")) future.value match { case Some(result) => result match { - case Success(response) => workers.remove(hostname) + case Success(response) => scheduler.removeWorker(address) case Failure(throwable) => throw new IOException(throwable.getMessage) } case None => throw new ExecutionTimeoutException @@ -115,29 +115,18 @@ class Master(sparkConf: SparkConf, port: Int) { LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " + s"with ${request.cores} cores") val workerId = UUID.randomUUID().toString - val workerHostAddress = request.hostAddress + val workerAddress = request.hostAddress val workerPort = request.port LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId") - val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef( - RpcAddress(workerHostAddress, workerPort), "search-service") - - workers.put(workerHostAddress, endPointRef) - LOG.info(s"worker ${request.hostAddress}:${request.port} added") + val endPointRef = + rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service") + scheduler.addWorker(workerAddress, + new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef)) + LOG.info(s"worker ${request.hostAddress}:${request.port} registered") RegisterWorkerResponse(workerId) } - private def getEndpoint(workerIP: String) = { - try { - workers(workerIP) - } catch { - case e: NoSuchElementException => - // no local worker available, choose one worker randomly - val index = new Random().nextInt(workers.size) - workers.toSeq(index)._2 - } - } - /** * Execute search by firing RPC call to worker, return the result rows * @param table table to search @@ -154,57 +143,65 @@ class Master(sparkConf: SparkConf, port: Int) { if (globalLimit < 0 || localLimit < 0) { throw new IllegalArgumentException("limit should be positive") } - if (workers.isEmpty) { - throw new IOException("No worker is available") - } val queryId = random.nextInt + var rowCount = 0 + val output = new ArrayBuffer[CarbonRow] + + def onSuccess(result: SearchResult): Unit = { + // in case of RPC success, collect all rows in response message + if (result.queryId != queryId) { + throw new IOException( + s"queryId in response does not match request: ${result.queryId} != $queryId") + } + if (result.status != Status.SUCCESS.ordinal()) { + throw new IOException(s"failure in worker: ${ result.message }") + } + + val itor = result.rows.iterator + while (itor.hasNext && rowCount < globalLimit) { + output += new CarbonRow(itor.next()) + rowCount = rowCount + 1 + } + LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount") + } + def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }") + def onTimedout() = throw new ExecutionTimeoutException() + // prune data and get a mapping of worker hostname to list of blocks, // then add these blocks to the SearchRequest and fire the RPC call val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) - val futures = nodeBlockMapping.asScala.map { case (hostname, blocks) => + val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => // Build a SearchRequest val split = new SerializableWritable[CarbonMultiBlockSplit]( - new CarbonMultiBlockSplit(blocks, hostname)) + new CarbonMultiBlockSplit(blocks, splitAddress)) val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) - // fire RPC to worker asynchronously - getEndpoint(hostname).ask[SearchResult](request) + // Find an Endpoind and send the request to it + // This RPC is non-blocking so that we do not need to wait before send to next worker + scheduler.sendRequestAsync[SearchResult](splitAddress, request) } - // get all results from RPC response and return to caller - var rowCount = 0 - val output = new ArrayBuffer[CarbonRow] - // Loop to get the result of each Worker - futures.foreach { future: Future[SearchResult] => + // loop to get the result of each Worker + tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) => // if we have enough data already, we do not need to collect more result if (rowCount < globalLimit) { - // wait on worker for 10s + // wait for worker for 10s ThreadUtils.awaitResult(future, Duration.apply("10s")) - future.value match { - case Some(response: Try[SearchResult]) => - response match { - case Success(result) => - if (result.queryId != queryId) { - throw new IOException( - s"queryId in response does not match request: ${ result.queryId } != $queryId") - } - if (result.status != Status.SUCCESS.ordinal()) { - throw new IOException(s"failure in worker: ${ result.message }") - } - - val itor = result.rows.iterator - while (itor.hasNext && rowCount < globalLimit) { - output += new CarbonRow(itor.next()) - rowCount = rowCount + 1 - } - - case Failure(e) => - throw new IOException(s"exception in worker: ${ e.getMessage }") - } - case None => - throw new ExecutionTimeoutException() + LOG.info(s"[SearchId:$queryId] receive search response from worker " + + s"${worker.address}:${worker.port}") + try { + future.value match { + case Some(response: Try[SearchResult]) => + response match { + case Success(result) => onSuccess(result) + case Failure(e) => onFaiure(e) + } + case None => onTimedout() + } + } finally { + worker.workload.decrementAndGet() } } } @@ -230,12 +227,12 @@ class Master(sparkConf: SparkConf, port: Int) { CarbonLoaderUtil.nodeBlockMapping( distributables.asJava, -1, - workers.keySet.toList.asJava, + getWorkers.asJava, CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST) } /** return hostname of all workers */ - def getWorkers: JSet[String] = workers.keySet.asJava + def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq } // Exception if execution timed out in search mode http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala new file mode 100644 index 0000000..26208d0 --- /dev/null +++ b/store/search/src/main/scala/org/apache/spark/rpc/Scheduler.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.concurrent.Future +import scala.reflect.ClassTag +import scala.util.Random + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.util.CarbonProperties + +/** + * [[org.apache.spark.rpc.Master]] uses Scheduler to pick a Worker to send request + */ +private[rpc] class Scheduler { + // mapping of worker IP address to worker instance + private val workers = mutable.Map[String, Schedulable]() + private val random = new Random() + + private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * Pick a Worker according to the address and workload of the Worker + * Invoke the RPC and return Future result + */ + def sendRequestAsync[T: ClassTag]( + splitAddress: String, + request: Any): (Schedulable, Future[T]) = { + require(splitAddress != null) + if (workers.isEmpty) { + throw new IOException("No worker is available") + } + var worker = pickWorker(splitAddress) + + // check whether worker exceed max workload, if exceeded, pick next worker + val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores) + var numTry = workers.size + do { + if (worker.workload.get() >= maxWorkload) { + LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...") + worker = pickNextWorker(worker) + numTry = numTry - 1 + } else { + numTry = -1 + } + } while (numTry > 0) + if (numTry == 0) { + // tried so many times and still not able to find Worker + throw new WorkerTooBusyException( + s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload") + } + LOG.info(s"sending search request to worker ${worker.address}:${worker.port}") + val future = worker.ref.ask(request) + worker.workload.incrementAndGet() + (worker, future) + } + + private def pickWorker[T: ClassTag](splitAddress: String) = { + try { + workers(splitAddress) + } catch { + case e: NoSuchElementException => + // no local worker available, choose one worker randomly + pickRandomWorker() + } + } + + /** pick a worker randomly */ + private def pickRandomWorker() = { + val index = random.nextInt(workers.size) + workers.toSeq(index)._2 + } + + /** pick the next worker of the input worker in the [[Scheduler.workers]] */ + private def pickNextWorker(worker: Schedulable) = { + val index = workers.zipWithIndex.find { case ((address, w), index) => + w == worker + }.get._2 + if (index == workers.size - 1) { + workers.toSeq.head._2 + } else { + workers.toSeq(index + 1)._2 + } + } + + /** A new searcher is trying to register, add it to the map and connect to this searcher */ + def addWorker(address: String, schedulable: Schedulable): Unit = { + require(schedulable != null) + require(address.equals(schedulable.address)) + workers(address) = schedulable + } + + def removeWorker(address: String): Unit = { + workers.remove(address) + } + + def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator +} + +/** + * Represent a Worker which [[Scheduler]] can send + * Search request on it + * @param id Worker ID, a UUID string + * @param cores, number of cores in Worker + * @param ref RPC endpoint reference + * @param workload number of outstanding request sent to Worker + */ +private[rpc] class Schedulable( + val id: String, + val address: String, + val port: Int, + val cores: Int, + val ref: RpcEndpointRef, + var workload: AtomicInteger) { + def this(id: String, address: String, port: Int, cores: Int, ref: RpcEndpointRef) = { + this(id, address, port, cores, ref, new AtomicInteger()) + } +} + +class WorkerTooBusyException(message: String) extends RuntimeException(message) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala index 39be35f..0f2138a 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala @@ -35,7 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties @InterfaceAudience.Internal object Worker { private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private var hostAddress = InetAddress.getLocalHost.getHostAddress + private val hostAddress = InetAddress.getLocalHost.getHostAddress private var port: Int = _ def init(masterHostAddress: String, masterPort: Int): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/main/scala/org/apache/spark/search/Searcher.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala index 4ed796e..e467fd3 100644 --- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala +++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala @@ -54,7 +54,7 @@ class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint { // Search request sent from master to worker case class SearchRequest( - queryId: Int, + searchId: Int, split: SerializableWritable[CarbonMultiBlockSplit], tableInfo: TableInfo, projectColumns: Array[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f85381f/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala new file mode 100644 index 0000000..8780dc0 --- /dev/null +++ b/store/search/src/test/scala/org/apache/spark/rpc/SchedulerSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import scala.concurrent.Future +import scala.reflect.ClassTag + +import org.apache.spark.SparkConf +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class SchedulerSuite extends FunSuite with BeforeAndAfterEach { + + var scheduler: Scheduler = _ + var w1: Schedulable = _ + var w2: Schedulable = _ + var w3: Schedulable = _ + + override def beforeEach(): Unit = { + scheduler = new Scheduler() + w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef()) + w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef()) + w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef()) + + scheduler.addWorker("1.1.1.1", w1) + scheduler.addWorker("1.1.1.2", w2) + scheduler.addWorker("1.1.1.3", w3) + } + + test("test addWorker, removeWorker, getAllWorkers") { + assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) + + scheduler.removeWorker("1.1.1.2") + assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) + + val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef()) + scheduler.addWorker("1.1.1.4", w4) + assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet) + assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id) + } + + test("test normal schedule") { + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + assertResult(w1.id)(r1.id) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + assertResult(w2.id)(r2.id) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + assertResult(w3.id)(r3.id) + val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null) + assertResult(w1.id)(r4.id) + val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null) + assertResult(w2.id)(r5.id) + val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null) + assertResult(w3.id)(r6.id) + } + + test("test worker unavailable") { + val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null) + assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id)) + } + + test("test reschedule when target worker is overload") { + // by default, maxWorkload is number of core * 10, so it is 40 in this test suite + (1 to 40).foreach { i => + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null) + // it must be worker1 since worker3 exceed max workload + assertResult(w1.id)(r.id) + } + + test("test all workers are overload") { + // by default, maxWorkload is number of core * 10, so it is 40 in this test suite + (1 to 40).foreach { i => + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + + val e = intercept[WorkerTooBusyException] { + scheduler.sendRequestAsync("1.1.1.3", null) + } + } + + test("test user configured overload param") { + val original = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3") + + (1 to 3).foreach { i => + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + + val e = intercept[WorkerTooBusyException] { + scheduler.sendRequestAsync("1.1.1.3", null) + } + + if (original != null) { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original) + } + } + + test("test invalid property") { + intercept[IllegalArgumentException] { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3") + } + var value = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + assertResult(null)(value) + + intercept[NumberFormatException] { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s") + } + value = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + assertResult(null)(value) + } +} + +class DummyRef extends RpcEndpointRef(new SparkConf) { + override def address: RpcAddress = null + + override def name: String = "" + + override def send(message: Any): Unit = { } + + override def ask[T](message: Any, timeout: RpcTimeout) + (implicit evidence$1: ClassTag[T]): Future[T] = null +} \ No newline at end of file
