http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/scala/org/apache/carbondata/store/Master.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala new file mode 100644 index 0000000..2109251 --- /dev/null +++ b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import java.io.IOException +import java.net.{BindException, InetAddress} +import java.util.{List => JList, Map => JMap, Objects, Random, UUID} +import java.util.concurrent.{ExecutionException, Future, TimeoutException, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.ipc.RPC +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.Job + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.block.Distributable +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit +import org.apache.carbondata.hadoop.api.CarbonInputFormat +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.store.rpc.{RegistryService, ServiceFactory} +import org.apache.carbondata.store.rpc.impl.{RegistryServiceImpl, Status} +import org.apache.carbondata.store.rpc.model._ + +/** + * Master of CarbonSearch. + * It provides a Registry service for worker to register. + * And it provides search API to fire RPC call to workers. + */ [email protected] +private[store] class Master { + private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + // worker host address map to EndpointRef + + private val random = new Random + + private var registryServer: RPC.Server = _ + + private val scheduler: Scheduler = new Scheduler + + def buildServer(serverHost: String, serverPort: Int): RPC.Server = { + val hadoopConf = FileFactory.getConfiguration + val builder = new RPC.Builder(hadoopConf) + builder + .setBindAddress(serverHost) + .setPort(serverPort) + .setProtocol(classOf[RegistryService]) + .setInstance(new RegistryServiceImpl(this)) + .build + } + + /** start service and listen on port passed in constructor */ + def startService(): Unit = { + if (registryServer == null) { + LOG.info("Start search mode master thread") + val isStarted: AtomicBoolean = new AtomicBoolean(false) + new Thread(new Runnable { + override def run(): Unit = { + val hostAddress = InetAddress.getLocalHost.getHostAddress + var port = CarbonProperties.getSearchMasterPort + var exception: BindException = null + var numTry = 100 // we will try to create service at worse case 100 times + do { + try { + LOG.info(s"building registry-service on $hostAddress:$port") + registryServer = buildServer(hostAddress, port) + numTry = 0 + } catch { + case e: BindException => + // port is occupied, increase the port number and try again + exception = e + LOG.error(s"start registry-service failed: ${e.getMessage}") + port = port + 1 + numTry = numTry - 1 + } + } while (numTry > 0) + if (registryServer == null) { + // we have tried many times, but still failed to find an available port + throw exception + } + if (isStarted.compareAndSet(false, false)) { + synchronized { + isStarted.compareAndSet(false, true) + } + } + LOG.info("starting registry-service") + registryServer.start() + LOG.info("registry-service started") + } + }).start() + var count = 0 + val countThreshold = 5000 + while (isStarted.compareAndSet(false, false) && count < countThreshold) { + LOG.info(s"Waiting search mode master to start, retrying $count times") + Thread.sleep(10) + count = count + 1 + } + if (count >= countThreshold) { + LOG.error(s"Search mode try $countThreshold times to start master but failed") + throw new RuntimeException( + s"Search mode try $countThreshold times to start master but failed") + } else { + LOG.info("Search mode master started") + } + } else { + LOG.info("Search mode master has already started") + } + } + + def stopService(): Unit = { + if (registryServer != null) { + registryServer.stop() + registryServer.join() + registryServer = null + } + } + + def stopAllWorkers(): Unit = { + scheduler.getAllWorkers.toSeq.foreach { case (address, schedulable) => + val response = try { + schedulable.service.shutdown(new ShutdownRequest("user")) + } catch { + case throwable: Throwable => + throw new IOException(throwable) + } + scheduler.removeWorker(address) + } + } + + /** A new searcher is trying to register, add it to the map and connect to this searcher */ + def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = { + LOG.info(s"Receive Register request from worker ${request.getHostAddress}:${request.getPort} " + + s"with ${request.getCores} cores") + val workerId = UUID.randomUUID().toString + val workerAddress = request.getHostAddress + val workerPort = request.getPort + LOG.info(s"connecting to worker ${request.getHostAddress}:${request.getPort}, " + + s"workerId $workerId") + + val searchService = ServiceFactory.createSearchService(workerAddress, workerPort) + scheduler.addWorker(workerAddress, + new Schedulable(workerId, workerAddress, workerPort, request.getCores, searchService)) + LOG.info(s"worker ${request.getHostAddress}:${request.getPort} registered") + new RegisterWorkerResponse(workerId) + } + + /** + * Execute search by firing RPC call to worker, return the result rows + * @param table table to search + * @param columns projection column names + * @param filter filter expression + * @param globalLimit max number of rows required in Master + * @param localLimit max number of rows required in Worker + * @return + */ + def search(table: CarbonTable, columns: Array[String], filter: Expression, + globalLimit: Long, localLimit: Long): Array[CarbonRow] = { + Objects.requireNonNull(table) + Objects.requireNonNull(columns) + if (globalLimit < 0 || localLimit < 0) { + throw new IllegalArgumentException("limit should be positive") + } + + val queryId = random.nextInt + var rowCount = 0 + val output = new ArrayBuffer[CarbonRow] + + def onSuccess(result: QueryResponse): Unit = { + // in case of RPC success, collect all rows in response message + if (result.getQueryId != queryId) { + throw new IOException( + s"queryId in response does not match request: ${result.getQueryId} != $queryId") + } + if (result.getStatus != Status.SUCCESS.ordinal()) { + throw new IOException(s"failure in worker: ${ result.getMessage }") + } + + val itor = result.getRows.iterator + while (itor.hasNext && rowCount < globalLimit) { + output += new CarbonRow(itor.next()) + rowCount = rowCount + 1 + } + LOG.info(s"[QueryId:$queryId] accumulated result size $rowCount") + } + def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }") + def onTimedout() = throw new ExecutionTimeoutException() + + // prune data and get a mapping of worker hostname to list of blocks, + // then add these blocks to the QueryRequest and fire the RPC call + val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) + val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => + // Build a QueryRequest + val split = new CarbonMultiBlockSplit(blocks, splitAddress) + val request = + new QueryRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) + + // Find an Endpoind and send the request to it + // This RPC is non-blocking so that we do not need to wait before send to next worker + scheduler.sendRequestAsync(splitAddress, request) + } + + // loop to get the result of each Worker + tuple.foreach { case (worker: Schedulable, future: Future[QueryResponse]) => + + // if we have enough data already, we do not need to collect more result + if (rowCount < globalLimit) { + // wait for worker + val response = try { + future.get(CarbonProperties.getInstance().getQueryTimeout.toLong, TimeUnit.SECONDS) + } catch { + case e: ExecutionException => onFaiure(e) + case t: TimeoutException => onTimedout() + } finally { + worker.workload.decrementAndGet() + } + LOG.info(s"[QueryId:$queryId] receive search response from worker " + + s"${worker.address}:${worker.port}") + onSuccess(response) + } + } + output.toArray + } + + /** + * Prune data by using CarbonInputFormat.getSplit + * Return a mapping of host address to list of block + */ + private def pruneBlock( + table: CarbonTable, + columns: Array[String], + filter: Expression): JMap[String, JList[Distributable]] = { + val jobConf = new JobConf(new Configuration) + val job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonTableInputFormat( + job, table, columns, filter, null, null) + + // We will do FG pruning in reader side, so don't do it here + CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false) + val splits = format.getSplits(job) + val distributables = splits.asScala.map { split => + split.asInstanceOf[Distributable] + } + CarbonLoaderUtil.nodeBlockMapping( + distributables.asJava, + -1, + getWorkers.asJava, + CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, + null) + } + + /** return hostname of all workers */ + def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq +} + +// Exception if execution timed out in search mode +class ExecutionTimeoutException extends RuntimeException
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala new file mode 100644 index 0000000..fb3ef86 --- /dev/null +++ b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import java.io.IOException +import java.util.concurrent.{Callable, Executors, Future} +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.reflect.ClassTag +import scala.util.Random + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.store.rpc.QueryService +import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse} + +/** + * [[Master]] uses Scheduler to pick a Worker to send request + */ [email protected] +private[store] class Scheduler { + // mapping of worker IP address to worker instance + private val workers = mutable.Map[String, Schedulable]() + private val random = new Random() + private val executors = Executors.newCachedThreadPool() + private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * Pick a Worker according to the address and workload of the Worker + * Invoke the RPC and return Future result + */ + def sendRequestAsync( + splitAddress: String, + request: QueryRequest): (Schedulable, Future[QueryResponse]) = { + require(splitAddress != null) + if (workers.isEmpty) { + throw new IOException("No worker is available") + } + var worker: Schedulable = pickWorker(splitAddress) + + // check whether worker exceed max workload, if exceeded, pick next worker + val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores) + var numTry = workers.size + do { + if (worker.workload.get() >= maxWorkload) { + LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...") + worker = pickNextWorker(worker) + numTry = numTry - 1 + } else { + numTry = -1 + } + } while (numTry > 0) + if (numTry == 0) { + // tried so many times and still not able to find Worker + throw new WorkerTooBusyException( + s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload") + } + LOG.info(s"sending search request to worker ${worker.address}:${worker.port}") + val future = executors.submit( + new Callable[QueryResponse] { + override def call(): QueryResponse = worker.service.query(request) + } + ) + worker.workload.incrementAndGet() + (worker, future) + } + + private def pickWorker[T: ClassTag](splitAddress: String) = { + try { + workers(splitAddress) + } catch { + case e: NoSuchElementException => + // no local worker available, choose one worker randomly + pickRandomWorker() + } + } + + /** pick a worker randomly */ + private def pickRandomWorker() = { + val index = random.nextInt(workers.size) + workers.toSeq(index)._2 + } + + /** pick the next worker of the input worker in the [[Scheduler.workers]] */ + private def pickNextWorker(worker: Schedulable) = { + val index = workers.zipWithIndex.find { case ((address, w), index) => + w == worker + }.get._2 + if (index == workers.size - 1) { + workers.toSeq.head._2 + } else { + workers.toSeq(index + 1)._2 + } + } + + /** A new searcher is trying to register, add it to the map and connect to this searcher */ + def addWorker(address: String, schedulable: Schedulable): Unit = { + require(schedulable != null) + require(address.equals(schedulable.address)) + workers(address) = schedulable + } + + def removeWorker(address: String): Unit = { + workers.remove(address) + } + + def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator +} + +/** + * Represent a Worker which [[Scheduler]] can send + * Search request on it + * @param id Worker ID, a UUID string + * @param cores, number of cores in Worker + * @param service RPC service reference + * @param workload number of outstanding request sent to Worker + */ +private[store] class Schedulable( + val id: String, + val address: String, + val port: Int, + val cores: Int, + val service: QueryService, + var workload: AtomicInteger) { + def this(id: String, address: String, port: Int, cores: Int, service: QueryService) = { + this(id, address, port, cores, service, new AtomicInteger()) + } +} + +class WorkerTooBusyException(message: String) extends RuntimeException(message) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala new file mode 100644 index 0000000..2ded00b --- /dev/null +++ b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import java.io.IOException +import java.net.{BindException, InetAddress} + +import org.apache.hadoop.ipc.RPC + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.store.rpc.{QueryService, RegistryService, ServiceFactory} +import org.apache.carbondata.store.rpc.impl.QueryServiceImpl +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest + [email protected] +private[store] object Worker { + private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private val hostAddress = InetAddress.getLocalHost.getHostAddress + private var port: Int = _ + private var registry: RegistryService = _ + + def init(masterHostAddress: String, masterPort: Int): Unit = { + LOG.info(s"initializing worker...") + startService() + LOG.info(s"registering to master $masterHostAddress:$masterPort") + val workerId = registerToMaster(masterHostAddress, masterPort) + LOG.info(s"worker registered to master, workerId: $workerId") + } + + def buildServer(serverHost: String, serverPort: Int): RPC.Server = { + val hadoopConf = FileFactory.getConfiguration + val builder = new RPC.Builder(hadoopConf) + builder + .setNumHandlers(Runtime.getRuntime.availableProcessors) + .setBindAddress(serverHost) + .setPort(serverPort) + .setProtocol(classOf[QueryService]) + .setInstance(new QueryServiceImpl) + .build + } + + /** + * Start to listen on port [[CarbonProperties.getSearchWorkerPort]] + */ + private def startService(): Unit = { + new Thread(new Runnable { + override def run(): Unit = { + port = CarbonProperties.getSearchWorkerPort + var searchServer: RPC.Server = null + var exception: BindException = null + var numTry = 100 // we will try to create service at worse case 100 times + do { + try { + LOG.info(s"building search-service on $hostAddress:$port") + searchServer = buildServer(hostAddress, port) + numTry = 0 + } catch { + case e: BindException => + // port is occupied, increase the port number and try again + exception = e + LOG.error(s"start search-service failed: ${e.getMessage}") + port = port + 1 + numTry = numTry - 1 + } + } while (numTry > 0) + if (searchServer == null) { + // we have tried many times, but still failed to find an available port + throw exception + } + LOG.info("starting search-service") + searchServer.start() + LOG.info("search-service started") + } + }).start() + } + + private def registerToMaster(registryHostAddress: String, registryPort: Int): String = { + LOG.info(s"trying to register to master $registryHostAddress:$registryPort") + if (registry == null) { + registry = ServiceFactory.createRegistryService(registryHostAddress, registryPort) + } + val cores = Runtime.getRuntime.availableProcessors() + val request = new RegisterWorkerRequest(hostAddress, port, cores) + val response = try { + registry.registerWorker(request) + } catch { + case throwable: Throwable => + LOG.error(s"worker failed to registered: $throwable") + throw new IOException(throwable) + } + + LOG.info("worker registered") + response.getWorkerId + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java new file mode 100644 index 0000000..c885a26 --- /dev/null +++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.sdk.file.Schema; +import org.apache.carbondata.sdk.file.TestUtil; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class LocalCarbonStoreTest { + @Before + public void cleanFile() { + assert (TestUtil.cleanMdtFile()); + } + + @After + public void verifyDMFile() { + assert (!TestUtil.verifyMdtFile()); + } + + // TODO: complete this testcase + // Currently result rows are empty, because SDK is not writing table status file + // so that reader does not find any segment. + // Complete this testcase after flat folder reader is done. + @Test + public void testWriteAndReadFiles() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true); + + CarbonStore store = new LocalCarbonStore(); + Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null); + + while (rows.hasNext()) { + CarbonRow row = rows.next(); + System.out.println(row.toString()); + } + + FileUtils.deleteDirectory(new File(path)); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java new file mode 100644 index 0000000..9b9aa9e --- /dev/null +++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.junit.Assert; + +public class TestUtil { + + static void writeFilesAndVerify(Schema schema, String path) { + writeFilesAndVerify(schema, path, null); + } + + static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true); + } + + public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema) { + writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, true); + } + + public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema, + boolean isTransactionalTable) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable); + } + + /** + * write file and verify + * + * @param rows number of rows + * @param schema schema + * @param path table store path + * @param persistSchema whether persist schema + * @param isTransactionalTable whether is transactional table + */ + public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema, + boolean isTransactionalTable) { + writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable); + } + + /** + * Invoke CarbonWriter API to write carbon files and assert the file is rewritten + * @param rows number of rows to write + * @param schema schema of the file + * @param path local write path + * @param sortColumns sort columns + * @param persistSchema true if want to persist schema file + * @param blockletSize blockletSize in the file, -1 for default size + * @param blockSize blockSize in the file, -1 for default size + * @param isTransactionalTable set to true if this is written for Transactional Table. + */ + static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) { + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .isTransactionalTable(isTransactionalTable) + .outputPath(path); + if (sortColumns != null) { + builder = builder.sortBy(sortColumns); + } + if (persistSchema) { + builder = builder.persistSchemaFile(true); + } + if (blockletSize != -1) { + builder = builder.withBlockletSize(blockletSize); + } + if (blockSize != -1) { + builder = builder.withBlockSize(blockSize); + } + + CarbonWriter writer = builder.buildWriterForCSVInput(schema); + + for (int i = 0; i < rows; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); + } + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } catch (InvalidLoadOptionException l) { + l.printStackTrace(); + Assert.fail(l.getMessage()); + } + + File segmentFolder = null; + if (isTransactionalTable) { + segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + } else { + segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + } + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + } + + /** + * verify whether the file exists + * if delete the file success or file not exists, then return true; otherwise return false + * + * @return boolean + */ + public static boolean cleanMdtFile() { + String fileName = CarbonProperties.getInstance().getSystemFolderLocation() + + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; + try { + if (FileFactory.isFileExist(fileName)) { + File file = new File(fileName); + file.delete(); + return true; + } else { + return true; + } + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + /** + * verify whether the mdt file exists + * if the file exists, then return true; otherwise return false + * + * @return boolean + */ + public static boolean verifyMdtFile() { + String fileName = CarbonProperties.getInstance().getSystemFolderLocation() + + CarbonCommonConstants.FILE_SEPARATOR + "datamap.mdtfile"; + try { + if (FileFactory.isFileExist(fileName)) { + return true; + } + return false; + } catch (IOException e) { + throw new RuntimeException("IO exception:", e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala new file mode 100644 index 0000000..95e7335 --- /dev/null +++ b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store + +import org.apache.hadoop.ipc.ProtocolSignature +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.store.rpc.QueryService +import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse, ShutdownRequest, ShutdownResponse} + +class SchedulerSuite extends FunSuite with BeforeAndAfterEach { + + var scheduler: Scheduler = _ + var w1: Schedulable = _ + var w2: Schedulable = _ + var w3: Schedulable = _ + + override def beforeEach(): Unit = { + scheduler = new Scheduler() + w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef()) + w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef()) + w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef()) + + scheduler.addWorker("1.1.1.1", w1) + scheduler.addWorker("1.1.1.2", w2) + scheduler.addWorker("1.1.1.3", w3) + } + + test("test addWorker, removeWorker, getAllWorkers") { + assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) + + scheduler.removeWorker("1.1.1.2") + assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) + + val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef()) + scheduler.addWorker("1.1.1.4", w4) + assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet) + assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id) + } + + test("test normal schedule") { + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + assertResult(w1.id)(r1.id) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + assertResult(w2.id)(r2.id) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + assertResult(w3.id)(r3.id) + val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null) + assertResult(w1.id)(r4.id) + val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null) + assertResult(w2.id)(r5.id) + val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null) + assertResult(w3.id)(r6.id) + } + + test("test worker unavailable") { + val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null) + assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id)) + } + + test("test reschedule when target worker is overload") { + // by default, maxWorkload is number of core * 10, so it is 40 in this test suite + (1 to 40).foreach { i => + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null) + // it must be worker1 since worker3 exceed max workload + assertResult(w1.id)(r.id) + } + + test("test all workers are overload") { + // by default, maxWorkload is number of core * 10, so it is 40 in this test suite + (1 to 40).foreach { i => + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + + val e = intercept[WorkerTooBusyException] { + scheduler.sendRequestAsync("1.1.1.3", null) + } + } + + test("test user configured overload param") { + val original = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3") + + (1 to 3).foreach { i => + val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) + val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) + val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) + } + + val e = intercept[WorkerTooBusyException] { + scheduler.sendRequestAsync("1.1.1.3", null) + } + + if (original != null) { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original) + } + } + + test("test invalid property") { + intercept[IllegalArgumentException] { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3") + } + var value = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + assertResult(null)(value) + + intercept[NumberFormatException] { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s") + } + value = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) + assertResult(null)(value) + } +} + +class DummyRef extends QueryService { + override def query(request: QueryRequest): QueryResponse = ??? + + override def shutdown(request: ShutdownRequest): ShutdownResponse = ??? + + override def getProtocolVersion(protocol: String, + clientVersion: Long): Long = ??? + + override def getProtocolSignature(protocol: String, + clientVersion: Long, + clientMethodsHash: Int): ProtocolSignature = ??? +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java deleted file mode 100644 index bafbb9f..0000000 --- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; - -/** - * ReadSupport that convert row object to CarbonRow - */ [email protected] -public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> { - private CarbonReadSupport<Object[]> delegate; - - public CarbonRowReadSupport() { - this.delegate = new DictionaryDecodeReadSupport<>(); - } - - @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) - throws IOException { - delegate.initialize(carbonColumns, carbonTable); - } - - @Override public CarbonRow readRow(Object[] data) { - Object[] converted = delegate.readRow(data); - return new CarbonRow(converted); - } - - @Override public void close() { - delegate.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java deleted file mode 100644 index c6b2fb8..0000000 --- a/store/sdk/src/main/java/org/apache/carbondata/store/CarbonStore.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.scan.expression.Expression; - -/** - * User can use {@link CarbonStore} to query data - */ [email protected] [email protected] -public interface CarbonStore extends Closeable { - - /** - * Scan query on the data in the table path - * @param path table path - * @param projectColumns column names to read - * @return rows - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> scan( - String path, - String[] projectColumns) throws IOException; - - /** - * Scan query with filter, on the data in the table path - * @param path table path - * @param projectColumns column names to read - * @param filter filter condition, can be null - * @return rows that satisfy filter condition - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> scan( - String path, - String[] projectColumns, - Expression filter) throws IOException; - - /** - * SQL query, table should be created before calling this function - * @param sqlString SQL statement - * @return rows - * @throws IOException if unable to read files in table path - */ - Iterator<CarbonRow> sql(String sqlString) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java deleted file mode 100644 index daa1447..0000000 --- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.api.CarbonInputFormat; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * A CarbonStore implementation that works locally, without other compute framework dependency. - * It can be used to read data in local disk. - * - * Note that this class is experimental, it is not intended to be used in production. - */ [email protected] -class LocalCarbonStore extends MetaCachedCarbonStore { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(LocalCarbonStore.class.getName()); - - @Override - public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException { - return scan(path, projectColumns, null); - } - - @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter) - throws IOException { - Objects.requireNonNull(path); - Objects.requireNonNull(projectColumns); - - CarbonTable table = getTable(path); - if (table.isStreamingSink() || table.isHivePartitionTable()) { - throw new UnsupportedOperationException("streaming and partition table is not supported"); - } - // TODO: use InputFormat to prune data and read data - - final CarbonTableInputFormat format = new CarbonTableInputFormat(); - final Job job = new Job(new Configuration()); - CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo()); - CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath()); - CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName()); - CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName()); - CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class); - CarbonInputFormat - .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns)); - if (filter != null) { - CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); - } - - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - - List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size()); - - List<CarbonRow> rows = new ArrayList<>(); - - try { - for (InputSplit split : splits) { - TaskAttemptContextImpl attempt = - new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); - reader.initialize(split, attempt); - readers.add(reader); - } - - for (RecordReader<Void, Object> reader : readers) { - while (reader.nextKeyValue()) { - rows.add((CarbonRow) reader.getCurrentValue()); - } - try { - reader.close(); - } catch (IOException e) { - LOGGER.error(e); - } - } - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - for (RecordReader<Void, Object> reader : readers) { - try { - reader.close(); - } catch (IOException e) { - LOGGER.error(e); - } - } - } - return rows.iterator(); - } - - @Override - public Iterator<CarbonRow> sql(String sqlString) throws IOException { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java deleted file mode 100644 index e43f750..0000000 --- a/store/sdk/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -/** - * A CarbonStore base class that caches CarbonTable object - */ [email protected] -abstract class MetaCachedCarbonStore implements CarbonStore { - - // mapping of table path to CarbonTable object - private Map<String, CarbonTable> cache = new HashMap<>(); - - CarbonTable getTable(String path) throws IOException { - if (cache.containsKey(path)) { - return cache.get(path); - } - org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil - .readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); - tableInfo1.setTablePath(path); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1); - cache.put(path, table); - return table; - } - - @Override - public void close() throws IOException { - cache.clear(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java deleted file mode 100644 index c885a26..0000000 --- a/store/sdk/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.sdk.file.Field; -import org.apache.carbondata.sdk.file.Schema; -import org.apache.carbondata.sdk.file.TestUtil; - -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class LocalCarbonStoreTest { - @Before - public void cleanFile() { - assert (TestUtil.cleanMdtFile()); - } - - @After - public void verifyDMFile() { - assert (!TestUtil.verifyMdtFile()); - } - - // TODO: complete this testcase - // Currently result rows are empty, because SDK is not writing table status file - // so that reader does not find any segment. - // Complete this testcase after flat folder reader is done. - @Test - public void testWriteAndReadFiles() throws IOException { - String path = "./testWriteFiles"; - FileUtils.deleteDirectory(new File(path)); - - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.INT); - - TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true); - - CarbonStore store = new LocalCarbonStore(); - Iterator<CarbonRow> rows = store.scan(path, new String[]{"name, age"}, null); - - while (rows.hasNext()) { - CarbonRow row = rows.next(); - System.out.println(row.toString()); - } - - FileUtils.deleteDirectory(new File(path)); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/pom.xml ---------------------------------------------------------------------- diff --git a/store/search/pom.xml b/store/search/pom.xml deleted file mode 100644 index 6acbbfb..0000000 --- a/store/search/pom.xml +++ /dev/null @@ -1,112 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-parent</artifactId> - <version>1.5.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>carbondata-search</artifactId> - <name>Apache CarbonData :: Search </name> - - <properties> - <dev.path>${basedir}/../../dev</dev.path> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <testSourceDirectory>src/test/scala</testSourceDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>testCompile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <!-- Note config is repeated in surefire config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>CarbonTestSuite.txt</filereports> - <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m - </argLine> - <stderr /> - <environmentVariables> - </environmentVariables> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> - </systemProperties> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index b93d80f..e69de29 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.worker; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapDistributable; -import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; -import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.model.QueryModelBuilder; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonTaskInfo; -import org.apache.carbondata.core.util.ThreadLocalTaskInfo; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; - -import org.apache.spark.search.SearchRequest; -import org.apache.spark.search.SearchResult; -import org.apache.spark.search.ShutdownRequest; -import org.apache.spark.search.ShutdownResponse; - -/** - * Thread runnable for handling SearchRequest from master. - */ [email protected] -public class SearchRequestHandler { - - private static final LogService LOG = - LogServiceFactory.getLogService(SearchRequestHandler.class.getName()); - - public SearchResult handleSearch(SearchRequest request) { - try { - LOG.info(String.format("[SearchId:%d] receive search request", request.searchId())); - List<CarbonRow> rows = handleRequest(request); - LOG.info(String.format("[SearchId:%d] sending success response", request.searchId())); - return createSuccessResponse(request, rows); - } catch (IOException | InterruptedException e) { - LOG.error(e); - LOG.info(String.format("[SearchId:%d] sending failure response", request.searchId())); - return createFailureResponse(request, e); - } - } - - public ShutdownResponse handleShutdown(ShutdownRequest request) { - LOG.info("Shutting down worker..."); - SearchModeDetailQueryExecutor.shutdownThreadPool(); - SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); - LOG.info("Worker shutted down"); - return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); - } - - private DataMapExprWrapper chooseFGDataMap( - CarbonTable table, - FilterResolverIntf filterInterface) { - DataMapChooser chooser = null; - try { - chooser = new DataMapChooser(table); - return chooser.chooseFGDataMap(filterInterface); - } catch (IOException e) { - LOG.audit(e.getMessage()); - return null; - } - } - - /** - * Builds {@link QueryModel} and read data from files - */ - private List<CarbonRow> handleRequest(SearchRequest request) - throws IOException, InterruptedException { - CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo(); - carbonTaskInfo.setTaskId(System.nanoTime()); - ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo); - TableInfo tableInfo = request.tableInfo(); - CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); - QueryModel queryModel = createQueryModel(table, request); - - // in search mode, plain reader is better since it requires less memory - queryModel.setVectorReader(false); - - CarbonMultiBlockSplit mbSplit = request.split().value(); - List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); - queryModel.setTableBlockInfos(list); - long limit = request.limit(); - long rowCount = 0; - - LOG.info(String.format("[SearchId:%d] %s, number of block: %d", - request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size())); - DataMapExprWrapper fgDataMap = chooseFGDataMap(table, - queryModel.getFilterExpressionResolverTree()); - - // If there is DataMap selected in Master, prune the split by it - if (fgDataMap != null) { - queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap); - } - - // In search mode, reader will read multiple blocks by using a thread pool - CarbonRecordReader<CarbonRow> reader = - new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport()); - - // read all rows by the reader - List<CarbonRow> rows = new LinkedList<>(); - try { - reader.initialize(mbSplit, null); - - // loop to read required number of rows. - // By default, if user does not specify the limit value, limit is Long.MaxValue - while (reader.nextKeyValue() && rowCount < limit) { - rows.add(reader.getCurrentValue()); - rowCount++; - } - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - reader.close(); - } - LOG.info(String.format("[SearchId:%d] scan completed, return %d rows", - request.searchId(), rows.size())); - return rows; - } - - /** - * If there is FGDataMap defined for this table and filter condition in the query, - * prune the splits by the DataMap and set the pruned split into the QueryModel and return - */ - private QueryModel prune(int queryId, CarbonTable table, QueryModel queryModel, - CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { - Objects.requireNonNull(datamap); - List<Segment> segments = new LinkedList<>(); - HashMap<String, Integer> uniqueSegments = new HashMap<>(); - LoadMetadataDetails[] loadMetadataDetails = - SegmentStatusManager.readLoadMetadata( - CarbonTablePath.getMetadataPath(table.getTablePath())); - for (CarbonInputSplit split : mbSplit.getAllSplits()) { - String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); - if (uniqueSegments.get(segmentId) == null) { - segments.add(Segment.toSegment(segmentId, - new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), - loadMetadataDetails))); - uniqueSegments.put(segmentId, 1); - } else { - uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); - } - } - - List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); - List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); - for (int i = 0; i < distributables.size(); i++) { - DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); - prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); - } - - HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); - for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { - pathToRead.put(prunedBlocklet.getFilePath().replace('\\', '/'), prunedBlocklet); - } - - List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); - List<TableBlockInfo> blockToRead = new LinkedList<>(); - for (TableBlockInfo block : blocks) { - if (pathToRead.keySet().contains(block.getFilePath())) { - // If not set this, it will can't create FineGrainBlocklet object in - // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData - block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); - blockToRead.add(block); - } - } - LOG.info(String.format("[SearchId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, - blockToRead.size())); - queryModel.setTableBlockInfos(blockToRead); - queryModel.setFG(true); - return queryModel; - } - - private QueryModel createQueryModel(CarbonTable table, SearchRequest request) { - String[] projectColumns = request.projectColumns(); - Expression filter = null; - if (request.filterExpression() != null) { - filter = request.filterExpression(); - } - return new QueryModelBuilder(table) - .projectColumns(projectColumns) - .filterExpression(filter) - .build(); - } - - /** - * create a failure response - */ - private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) { - return new SearchResult(request.searchId(), Status.FAILURE.ordinal(), throwable.getMessage(), - new Object[0][]); - } - - /** - * create a success response with result rows - */ - private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) { - Iterator<CarbonRow> itor = rows.iterator(); - Object[][] output = new Object[rows.size()][]; - int i = 0; - while (itor.hasNext()) { - output[i++] = itor.next().getData(); - } - return new SearchResult(request.searchId(), Status.SUCCESS.ordinal(), "", output); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java deleted file mode 100644 index 71df3e0..0000000 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.worker; - -import org.apache.carbondata.common.annotations.InterfaceAudience; - -/** - * Status of RPC response - */ [email protected] -public enum Status { - SUCCESS, FAILURE -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala deleted file mode 100644 index b7630fb..0000000 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rpc - -import java.io.IOException -import java.net.{BindException, InetAddress} -import java.util.{List => JList, Map => JMap, Objects, Random, UUID} -import java.util.concurrent.atomic.AtomicBoolean - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf} -import org.apache.spark.rpc.netty.NettyRpcEnvFactory -import org.apache.spark.search._ -import org.apache.spark.util.ThreadUtils - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.Distributable -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit -import org.apache.carbondata.hadoop.api.CarbonInputFormat -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.store.worker.Status - -/** - * Master of CarbonSearch. - * It provides a Registry service for worker to register. - * And it provides search API to fire RPC call to workers. - */ [email protected] -class Master(sparkConf: SparkConf) { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - // worker host address map to EndpointRef - - private val random = new Random - - private var rpcEnv: RpcEnv = _ - - private val scheduler: Scheduler = new Scheduler - - /** start service and listen on port passed in constructor */ - def startService(): Unit = { - if (rpcEnv == null) { - LOG.info("Start search mode master thread") - val isStarted: AtomicBoolean = new AtomicBoolean(false) - new Thread(new Runnable { - override def run(): Unit = { - val hostAddress = InetAddress.getLocalHost.getHostAddress - var port = CarbonProperties.getSearchMasterPort - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"starting registry-service on $hostAddress:$port") - val config = RpcEnvConfig( - sparkConf, "registry-service", hostAddress, "", port, - new SecurityManager(sparkConf), clientMode = false) - rpcEnv = new NettyRpcEnvFactory().create(config) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start registry-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (rpcEnv == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this) - rpcEnv.setupEndpoint("registry-service", registryEndpoint) - if (isStarted.compareAndSet(false, false)) { - synchronized { - isStarted.compareAndSet(false, true) - } - } - LOG.info("registry-service started") - rpcEnv.awaitTermination() - } - }).start() - var count = 0 - val countThreshold = 5000 - while (isStarted.compareAndSet(false, false) && count < countThreshold) { - LOG.info(s"Waiting search mode master to start, retrying $count times") - Thread.sleep(10) - count = count + 1; - } - if (count >= countThreshold) { - LOG.error(s"Search mode try $countThreshold times to start master but failed") - throw new RuntimeException( - s"Search mode try $countThreshold times to start master but failed") - } else { - LOG.info("Search mode master started") - } - } else { - LOG.info("Search mode master has already started") - } - } - - def stopService(): Unit = { - if (rpcEnv != null) { - rpcEnv.shutdown() - rpcEnv = null - } - } - - def stopAllWorkers(): Unit = { - val futures = scheduler.getAllWorkers.toSeq.map { case (address, schedulable) => - (address, schedulable.ref.ask[ShutdownResponse](ShutdownRequest("user"))) - } - futures.foreach { case (address, future) => - ThreadUtils.awaitResult(future, Duration.apply("10s")) - future.value match { - case Some(result) => - result match { - case Success(response) => scheduler.removeWorker(address) - case Failure(throwable) => throw new IOException(throwable.getMessage) - } - case None => throw new ExecutionTimeoutException - } - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = { - LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " + - s"with ${request.cores} cores") - val workerId = UUID.randomUUID().toString - val workerAddress = request.hostAddress - val workerPort = request.port - LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId") - - val endPointRef = - rpcEnv.setupEndpointRef(RpcAddress(workerAddress, workerPort), "search-service") - scheduler.addWorker(workerAddress, - new Schedulable(workerId, workerAddress, workerPort, request.cores, endPointRef)) - LOG.info(s"worker ${request.hostAddress}:${request.port} registered") - RegisterWorkerResponse(workerId) - } - - /** - * Execute search by firing RPC call to worker, return the result rows - * @param table table to search - * @param columns projection column names - * @param filter filter expression - * @param globalLimit max number of rows required in Master - * @param localLimit max number of rows required in Worker - * @return - */ - def search(table: CarbonTable, columns: Array[String], filter: Expression, - globalLimit: Long, localLimit: Long): Array[CarbonRow] = { - Objects.requireNonNull(table) - Objects.requireNonNull(columns) - if (globalLimit < 0 || localLimit < 0) { - throw new IllegalArgumentException("limit should be positive") - } - - val queryId = random.nextInt - var rowCount = 0 - val output = new ArrayBuffer[CarbonRow] - - def onSuccess(result: SearchResult): Unit = { - // in case of RPC success, collect all rows in response message - if (result.queryId != queryId) { - throw new IOException( - s"queryId in response does not match request: ${result.queryId} != $queryId") - } - if (result.status != Status.SUCCESS.ordinal()) { - throw new IOException(s"failure in worker: ${ result.message }") - } - - val itor = result.rows.iterator - while (itor.hasNext && rowCount < globalLimit) { - output += new CarbonRow(itor.next()) - rowCount = rowCount + 1 - } - LOG.info(s"[SearchId:$queryId] accumulated result size $rowCount") - } - def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }") - def onTimedout() = throw new ExecutionTimeoutException() - - // prune data and get a mapping of worker hostname to list of blocks, - // then add these blocks to the SearchRequest and fire the RPC call - val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) - val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => - // Build a SearchRequest - val split = new SerializableWritable[CarbonMultiBlockSplit]( - new CarbonMultiBlockSplit(blocks, splitAddress)) - val request = - SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) - - // Find an Endpoind and send the request to it - // This RPC is non-blocking so that we do not need to wait before send to next worker - scheduler.sendRequestAsync[SearchResult](splitAddress, request) - } - - // loop to get the result of each Worker - tuple.foreach { case (worker: Schedulable, future: Future[SearchResult]) => - - // if we have enough data already, we do not need to collect more result - if (rowCount < globalLimit) { - // wait for worker - val timeout = CarbonProperties - .getInstance() - .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, - CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT) - ThreadUtils.awaitResult(future, Duration.apply(timeout)) - LOG.info(s"[SearchId:$queryId] receive search response from worker " + - s"${worker.address}:${worker.port}") - try { - future.value match { - case Some(response: Try[SearchResult]) => - response match { - case Success(result) => onSuccess(result) - case Failure(e) => onFaiure(e) - } - case None => onTimedout() - } - } finally { - worker.workload.decrementAndGet() - } - } - } - output.toArray - } - - /** - * Prune data by using CarbonInputFormat.getSplit - * Return a mapping of host address to list of block - */ - private def pruneBlock( - table: CarbonTable, - columns: Array[String], - filter: Expression): JMap[String, JList[Distributable]] = { - val jobConf = new JobConf(new Configuration) - val job = new Job(jobConf) - val format = CarbonInputFormatUtil.createCarbonTableInputFormat( - job, table, columns, filter, null, null) - - // We will do FG pruning in reader side, so don't do it here - CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false) - val splits = format.getSplits(job) - val distributables = splits.asScala.map { split => - split.asInstanceOf[Distributable] - } - CarbonLoaderUtil.nodeBlockMapping( - distributables.asJava, - -1, - getWorkers.asJava, - CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, - null) - } - - /** return hostname of all workers */ - def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq -} - -// Exception if execution timed out in search mode -class ExecutionTimeoutException extends RuntimeException
