[CARBONDATA-2609] Change RPC implementation to Hadoop RPC framework This closes #2372
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d9b40bf9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d9b40bf9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d9b40bf9 Branch: refs/heads/carbonstore Commit: d9b40bf9edd7017ebaffe0a25161140940737d63 Parents: a162897 Author: Jacky Li <[email protected]> Authored: Wed Jun 13 23:57:00 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Jul 18 10:04:47 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 +- .../carbondata/core/scan/model/QueryModel.java | 14 +- .../carbondata/core/util/CarbonProperties.java | 10 + .../core/util/ObjectSerializationUtil.java | 14 + .../carbondata/hadoop/CarbonRecordReader.java | 8 +- .../detailquery/SearchModeTestCase.scala | 17 +- integration/spark2/pom.xml | 2 +- .../carbondata/store/SparkCarbonStore.scala | 27 +- .../org/apache/spark/sql/CarbonSession.scala | 1 + pom.xml | 2 +- store/core/pom.xml | 113 +++++++ .../carbondata/store/CarbonRowReadSupport.java | 53 ++++ .../apache/carbondata/store/CarbonStore.java | 68 +++++ .../carbondata/store/LocalCarbonStore.java | 130 +++++++++ .../carbondata/store/MetaCachedCarbonStore.java | 59 ++++ .../carbondata/store/rpc/QueryService.java | 33 +++ .../carbondata/store/rpc/RegistryService.java | 30 ++ .../carbondata/store/rpc/ServiceFactory.java | 43 +++ .../store/rpc/impl/IndexedRecordReader.java | 161 ++++++++++ .../store/rpc/impl/QueryServiceImpl.java | 56 ++++ .../store/rpc/impl/RegistryServiceImpl.java | 54 ++++ .../store/rpc/impl/RequestHandler.java | 147 ++++++++++ .../carbondata/store/rpc/impl/Status.java | 28 ++ .../store/rpc/model/QueryRequest.java | 108 +++++++ .../store/rpc/model/QueryResponse.java | 84 ++++++ .../store/rpc/model/RegisterWorkerRequest.java | 69 +++++ .../store/rpc/model/RegisterWorkerResponse.java | 54 ++++ .../store/rpc/model/ShutdownRequest.java | 53 ++++ .../store/rpc/model/ShutdownResponse.java | 61 ++++ .../org/apache/carbondata/store/Master.scala | 283 ++++++++++++++++++ .../org/apache/carbondata/store/Scheduler.scala | 147 ++++++++++ .../org/apache/carbondata/store/Worker.scala | 113 +++++++ .../carbondata/store/LocalCarbonStoreTest.java | 72 +++++ .../org/apache/carbondata/store/TestUtil.java | 168 +++++++++++ .../carbondata/store/SchedulerSuite.scala | 155 ++++++++++ .../carbondata/store/CarbonRowReadSupport.java | 53 ---- .../apache/carbondata/store/CarbonStore.java | 68 ----- .../carbondata/store/LocalCarbonStore.java | 130 --------- .../carbondata/store/MetaCachedCarbonStore.java | 59 ---- .../carbondata/store/LocalCarbonStoreTest.java | 72 ----- store/search/pom.xml | 112 ------- .../store/worker/SearchRequestHandler.java | 247 ---------------- .../apache/carbondata/store/worker/Status.java | 28 -- .../scala/org/apache/spark/rpc/Master.scala | 291 ------------------- .../scala/org/apache/spark/rpc/Scheduler.scala | 139 --------- .../scala/org/apache/spark/rpc/Worker.scala | 118 -------- .../org/apache/spark/search/Registry.scala | 51 ---- .../org/apache/spark/search/Searcher.scala | 79 ----- .../carbondata/store/SearchServiceTest.java | 37 --- .../org/apache/spark/rpc/SchedulerSuite.scala | 154 ---------- 50 files changed, 2402 insertions(+), 1677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index e7e074d..ad3b0d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1741,7 +1741,7 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false"; /** - * It's timeout threshold of carbon search query + * It's timeout threshold of carbon search query, in seconds */ @CarbonProperty @InterfaceStability.Unstable @@ -1750,7 +1750,7 @@ public final class CarbonCommonConstants { /** * Default value is 10 seconds */ - public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s"; + public static final int CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = 10; /** * The size of thread pool used for reading files in Work for search mode. By default, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index 55dafb9..b15ce02 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -69,6 +69,7 @@ public class QueryModel { * table block information in which query will be executed */ private List<TableBlockInfo> tableBlockInfos; + /** * To handle most of the computation in query engines like spark and hive, carbon should give * raw detailed records to it. @@ -109,11 +110,6 @@ public class QueryModel { */ private boolean requiredRowId; - /** - * whether it is FG with search mode - */ - private boolean isFG; - private QueryModel(CarbonTable carbonTable) { tableBlockInfos = new ArrayList<TableBlockInfo>(); invalidSegmentIds = new ArrayList<>(); @@ -375,14 +371,6 @@ public class QueryModel { this.requiredRowId = requiredRowId; } - public boolean isFG() { - return isFG; - } - - public void setFG(boolean FG) { - isFG = FG; - } - @Override public String toString() { return String.format("scan on table %s.%s, %d projection columns with filter (%s)", http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index f7ace5e..b60e6a3 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1645,4 +1645,14 @@ public final class CarbonProperties { CarbonLoadOptionConstants.CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE_DEFAULT); } } + + public int getQueryTimeout() { + try { + return Integer.parseInt( + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT)); + } catch (NumberFormatException e) { + return CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java index 020787d..e133208 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java @@ -113,4 +113,18 @@ public class ObjectSerializationUtil { } } + public static byte[] serialize(Object object) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + return baos.toByteArray(); + } + + public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + if (bytes == null) { + return null; + } + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + return ois.readObject(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index a54e7a4..6c796b5 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -84,12 +84,8 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { } else { throw new RuntimeException("unsupported input split type: " + inputSplit); } - // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty - // otherwise the prune is no use before this method - if (!queryModel.isFG()) { - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); - queryModel.setTableBlockInfos(tableBlockInfoList); - } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); try { carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index 001f6c0..af9e50f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -60,6 +60,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { } test("SearchMode Query: row result") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") checkSearchAnswer("select * from main where city = 'city3'") CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, @@ -67,36 +68,44 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { } test("SearchMode Query: vector result") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select * from main where city = 'city3'") } test("equal filter") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where id = '100'") checkSearchAnswer("select id from main where planet = 'planet100'") } test("greater and less than filter") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where m2 < 4") } test("IN filter") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where id IN ('40', '50', '60')") } test("expression filter") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where length(id) < 2") } test("filter with limit") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where id = '3' limit 10") checkSearchAnswer("select id from main where length(id) < 2 limit 10") } test("aggregate query") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city") } test("aggregate query with datamap and fallback to SparkSQL") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ") checkSearchAnswer("select city, count(*) from main group by city") sql("drop datamap preagg on table main").show() @@ -108,10 +117,11 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { checkSearchAnswer("select id from main where id = '3' limit 10") sql("set carbon.search.enabled = false") assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) + sql("set carbon.search.enabled = true") } test("test lucene datamap with search mode") { - sql("set carbon.search.enabled = true") + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) sql("DROP DATAMAP IF EXISTS dm ON TABLE main") sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ") checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), @@ -120,6 +130,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { } test("test lucene datamap with search mode 2") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) sql("drop datamap if exists dm3 ON TABLE main") sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ") checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), @@ -128,6 +139,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { } test("test lucene datamap with search mode, two column") { + assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) sql("drop datamap if exists dm3 ON TABLE main") sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ") checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), @@ -137,7 +149,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP DATAMAP if exists dm3 ON TABLE main") } - test("start search mode twice") { + ignore("start search mode twice") { sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where id = '3' limit 10") @@ -148,6 +160,5 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() assert(sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) checkSearchAnswer("select id from main where id = '3' limit 10") - sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 24af8ec..48e9719 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -42,7 +42,7 @@ </dependency> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-store-sdk</artifactId> + <artifactId>carbondata-store-core</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala index 3a6adea..d99081d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala @@ -23,7 +23,6 @@ import java.net.InetAddress import scala.collection.JavaConverters._ import org.apache.spark.{CarbonInputMetrics, SparkConf} -import org.apache.spark.rpc.{Master, Worker} import org.apache.spark.sql.CarbonSession._ import org.apache.spark.sql.SparkSession @@ -111,24 +110,26 @@ class SparkCarbonStore extends MetaCachedCarbonStore { def startSearchMode(): Unit = { LOG.info("Starting search mode master") - master = new Master(session.sparkContext.getConf) + master = new Master() master.startService() startAllWorkers() } def stopSearchMode(): Unit = { - LOG.info("Shutting down all workers...") - try { - master.stopAllWorkers() - LOG.info("All workers are shutted down") - } catch { - case e: Exception => - LOG.error(s"failed to shutdown worker: ${e.toString}") + if (master != null) { + LOG.info("Shutting down all workers...") + try { + master.stopAllWorkers() + LOG.info("All workers are shut down") + } catch { + case e: Exception => + LOG.error(s"failed to shutdown worker: ${ e.toString }") + } + LOG.info("Stopping master...") + master.stopService() + LOG.info("Master stopped") + master = null } - LOG.info("Stopping master...") - master.stopService() - LOG.info("Master stopped") - master = null } /** search mode */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index c59bb08..5ab0d29 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -214,6 +214,7 @@ class CarbonSession(@transient val sc: SparkContext, case e: RuntimeException => LogServiceFactory.getLogService(this.getClass.getCanonicalName) .error(s"Stop search mode failed: ${e.getMessage}") + throw e } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ec68c1d..aeb87d3 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ <module>integration/spark-common-test</module> <module>datamap/examples</module> <module>store/sdk</module> - <module>store/search</module> + <module>store/core</module> <module>assembly</module> </modules> http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/pom.xml ---------------------------------------------------------------------- diff --git a/store/core/pom.xml b/store/core/pom.xml new file mode 100644 index 0000000..0bee84f --- /dev/null +++ b/store/core/pom.xml @@ -0,0 +1,113 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.5.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-store-core</artifactId> + <name>Apache CarbonData :: Store Core </name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-hadoop</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-sdk</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>CarbonTestSuite.txt</filereports> + <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java new file mode 100644 index 0000000..bafbb9f --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonRowReadSupport.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; + +/** + * ReadSupport that convert row object to CarbonRow + */ [email protected] +public class CarbonRowReadSupport implements CarbonReadSupport<CarbonRow> { + private CarbonReadSupport<Object[]> delegate; + + public CarbonRowReadSupport() { + this.delegate = new DictionaryDecodeReadSupport<>(); + } + + @Override public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) + throws IOException { + delegate.initialize(carbonColumns, carbonTable); + } + + @Override public CarbonRow readRow(Object[] data) { + Object[] converted = delegate.readRow(data); + return new CarbonRow(converted); + } + + @Override public void close() { + delegate.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java new file mode 100644 index 0000000..c6b2fb8 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/CarbonStore.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.scan.expression.Expression; + +/** + * User can use {@link CarbonStore} to query data + */ [email protected] [email protected] +public interface CarbonStore extends Closeable { + + /** + * Scan query on the data in the table path + * @param path table path + * @param projectColumns column names to read + * @return rows + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> scan( + String path, + String[] projectColumns) throws IOException; + + /** + * Scan query with filter, on the data in the table path + * @param path table path + * @param projectColumns column names to read + * @param filter filter condition, can be null + * @return rows that satisfy filter condition + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> scan( + String path, + String[] projectColumns, + Expression filter) throws IOException; + + /** + * SQL query, table should be created before calling this function + * @param sqlString SQL statement + * @return rows + * @throws IOException if unable to read files in table path + */ + Iterator<CarbonRow> sql(String sqlString) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java new file mode 100644 index 0000000..daa1447 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * A CarbonStore implementation that works locally, without other compute framework dependency. + * It can be used to read data in local disk. + * + * Note that this class is experimental, it is not intended to be used in production. + */ [email protected] +class LocalCarbonStore extends MetaCachedCarbonStore { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalCarbonStore.class.getName()); + + @Override + public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException { + return scan(path, projectColumns, null); + } + + @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter) + throws IOException { + Objects.requireNonNull(path); + Objects.requireNonNull(projectColumns); + + CarbonTable table = getTable(path); + if (table.isStreamingSink() || table.isHivePartitionTable()) { + throw new UnsupportedOperationException("streaming and partition table is not supported"); + } + // TODO: use InputFormat to prune data and read data + + final CarbonTableInputFormat format = new CarbonTableInputFormat(); + final Job job = new Job(new Configuration()); + CarbonInputFormat.setTableInfo(job.getConfiguration(), table.getTableInfo()); + CarbonInputFormat.setTablePath(job.getConfiguration(), table.getTablePath()); + CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName()); + CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName()); + CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class); + CarbonInputFormat + .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns)); + if (filter != null) { + CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); + } + + final List<InputSplit> splits = + format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + + List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size()); + + List<CarbonRow> rows = new ArrayList<>(); + + try { + for (InputSplit split : splits) { + TaskAttemptContextImpl attempt = + new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = format.createRecordReader(split, attempt); + reader.initialize(split, attempt); + readers.add(reader); + } + + for (RecordReader<Void, Object> reader : readers) { + while (reader.nextKeyValue()) { + rows.add((CarbonRow) reader.getCurrentValue()); + } + try { + reader.close(); + } catch (IOException e) { + LOGGER.error(e); + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + for (RecordReader<Void, Object> reader : readers) { + try { + reader.close(); + } catch (IOException e) { + LOGGER.error(e); + } + } + } + return rows.iterator(); + } + + @Override + public Iterator<CarbonRow> sql(String sqlString) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java new file mode 100644 index 0000000..e43f750 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/MetaCachedCarbonStore.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +/** + * A CarbonStore base class that caches CarbonTable object + */ [email protected] +abstract class MetaCachedCarbonStore implements CarbonStore { + + // mapping of table path to CarbonTable object + private Map<String, CarbonTable> cache = new HashMap<>(); + + CarbonTable getTable(String path) throws IOException { + if (cache.containsKey(path)) { + return cache.get(path); + } + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil + .readSchemaFile(CarbonTablePath.getSchemaFilePath(path)); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo tableInfo1 = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", ""); + tableInfo1.setTablePath(path); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo1); + cache.put(path, table); + return table; + } + + @Override + public void close() throws IOException { + cache.clear(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java new file mode 100644 index 0000000..faaa746 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/QueryService.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.rpc.model.ShutdownResponse; + +import org.apache.hadoop.ipc.VersionedProtocol; + [email protected] +public interface QueryService extends VersionedProtocol { + long versionID = 1L; + QueryResponse query(QueryRequest request); + ShutdownResponse shutdown(ShutdownRequest request); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java new file mode 100644 index 0000000..4d17686 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/RegistryService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; + +import org.apache.hadoop.ipc.VersionedProtocol; + [email protected] +public interface RegistryService extends VersionedProtocol { + long versionID = 1L; + RegisterWorkerResponse registerWorker(RegisterWorkerRequest request); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java new file mode 100644 index 0000000..a50ab8b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/ServiceFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + [email protected] +public class ServiceFactory { + + public static QueryService createSearchService(String host, int port) throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); + return RPC.getProxy( + QueryService.class, QueryService.versionID, address, new Configuration()); + } + + public static RegistryService createRegistryService(String host, int port) throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); + return RPC.getProxy( + RegistryService.class, RegistryService.versionID, address, new Configuration()); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java new file mode 100644 index 0000000..2c768d1 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/IndexedRecordReader.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a special RecordReader that leverages FGDataMap before reading carbondata file + * and return CarbonRow object + */ +public class IndexedRecordReader extends CarbonRecordReader<CarbonRow> { + + private static final LogService LOG = + LogServiceFactory.getLogService(RequestHandler.class.getName()); + + private int queryId; + private CarbonTable table; + + public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) { + super(queryModel, new CarbonRowReadSupport()); + this.queryId = queryId; + this.table = table; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit; + List<CarbonInputSplit> splits = mbSplit.getAllSplits(); + List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); + queryModel.setTableBlockInfos(list); + + // prune the block with FGDataMap is there is one based on the filter condition + DataMapExprWrapper fgDataMap = chooseFGDataMap(table, + queryModel.getFilterExpressionResolverTree()); + if (fgDataMap != null) { + queryModel = prune(table, queryModel, mbSplit, fgDataMap); + } else { + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splits); + queryModel.setTableBlockInfos(tableBlockInfoList); + } + + readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); + try { + carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); + } catch (QueryExecutionException e) { + throw new InterruptedException(e.getMessage()); + } + } + + private DataMapExprWrapper chooseFGDataMap( + CarbonTable table, + FilterResolverIntf filterInterface) { + DataMapChooser chooser = null; + try { + chooser = new DataMapChooser(table); + return chooser.chooseFGDataMap(filterInterface); + } catch (IOException e) { + LOG.error(e); + return null; + } + } + + /** + * If there is FGDataMap defined for this table and filter condition in the query, + * prune the splits by the DataMap and set the pruned split into the QueryModel and return + */ + private QueryModel prune(CarbonTable table, QueryModel queryModel, + CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { + Objects.requireNonNull(datamap); + List<Segment> segments = new LinkedList<>(); + HashMap<String, Integer> uniqueSegments = new HashMap<>(); + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); + for (CarbonInputSplit split : mbSplit.getAllSplits()) { + String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); + if (uniqueSegments.get(segmentId) == null) { + segments.add(Segment.toSegment(segmentId, + new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), + loadMetadataDetails))); + uniqueSegments.put(segmentId, 1); + } else { + uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); + } + } + + List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); + List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); + for (int i = 0; i < distributables.size(); i++) { + DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); + prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); + } + + HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); + for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { + pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); + } + + List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); + List<TableBlockInfo> blockToRead = new LinkedList<>(); + for (TableBlockInfo block : blocks) { + if (pathToRead.keySet().contains(block.getFilePath())) { + // If not set this, it won't create FineGrainBlocklet object in + // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData + block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); + blockToRead.add(block); + } + } + LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, + blockToRead.size())); + queryModel.setTableBlockInfos(blockToRead); + return queryModel; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java new file mode 100644 index 0000000..b191331 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/QueryServiceImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.rpc.QueryService; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.rpc.model.ShutdownResponse; + +import org.apache.hadoop.ipc.ProtocolSignature; + [email protected] +public class QueryServiceImpl implements QueryService { + + @Override + public QueryResponse query(QueryRequest request) { + RequestHandler handler = new RequestHandler(); + return handler.handleSearch(request); + } + + @Override + public ShutdownResponse shutdown(ShutdownRequest request) { + RequestHandler handler = new RequestHandler(); + return handler.handleShutdown(request); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java new file mode 100644 index 0000000..12f48ba --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RegistryServiceImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.Master; +import org.apache.carbondata.store.rpc.RegistryService; +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; + +import org.apache.hadoop.ipc.ProtocolSignature; + [email protected] +public class RegistryServiceImpl implements RegistryService { + + private Master master; + + public RegistryServiceImpl(Master master) { + this.master = master; + } + + @Override + public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) { + return master.addWorker(request); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java new file mode 100644 index 0000000..29ee546 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/RequestHandler.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.core.util.CarbonTaskInfo; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.rpc.model.ShutdownResponse; + +/** + * It handles request from master. + */ [email protected] +class RequestHandler { + + private static final LogService LOG = + LogServiceFactory.getLogService(RequestHandler.class.getName()); + + QueryResponse handleSearch(QueryRequest request) { + try { + LOG.info(String.format("[QueryId:%d] receive search request", request.getRequestId())); + List<CarbonRow> rows = handleRequest(request); + LOG.info(String.format("[QueryId:%d] sending success response", request.getRequestId())); + return createSuccessResponse(request, rows); + } catch (IOException e) { + LOG.error(e); + LOG.info(String.format("[QueryId:%d] sending failure response", request.getRequestId())); + return createFailureResponse(request, e); + } + } + + ShutdownResponse handleShutdown(ShutdownRequest request) { + LOG.info("Shutting down worker..."); + SearchModeDetailQueryExecutor.shutdownThreadPool(); + SearchModeVectorDetailQueryExecutor.shutdownThreadPool(); + LOG.info("Worker shut down"); + return new ShutdownResponse(Status.SUCCESS.ordinal(), ""); + } + + /** + * Builds {@link QueryModel} and read data from files + */ + private List<CarbonRow> handleRequest(QueryRequest request) throws IOException { + CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo(); + carbonTaskInfo.setTaskId(System.nanoTime()); + ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo); + CarbonMultiBlockSplit mbSplit = request.getSplit(); + long limit = request.getLimit(); + TableInfo tableInfo = request.getTableInfo(); + CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo); + QueryModel queryModel = createQueryModel(table, request); + + LOG.info(String.format("[QueryId:%d] %s, number of block: %d", + request.getRequestId(), queryModel.toString(), mbSplit.getAllSplits().size())); + + // read all rows by the reader + List<CarbonRow> rows = new LinkedList<>(); + try (CarbonRecordReader<CarbonRow> reader = + new IndexedRecordReader(request.getRequestId(), table, queryModel)) { + reader.initialize(mbSplit, null); + + // loop to read required number of rows. + // By default, if user does not specify the limit value, limit is Long.MaxValue + long rowCount = 0; + while (reader.nextKeyValue() && rowCount < limit) { + rows.add(reader.getCurrentValue()); + rowCount++; + } + } catch (InterruptedException e) { + throw new IOException(e); + } + LOG.info(String.format("[QueryId:%d] scan completed, return %d rows", + request.getRequestId(), rows.size())); + return rows; + } + + + + private QueryModel createQueryModel(CarbonTable table, QueryRequest request) { + String[] projectColumns = request.getProjectColumns(); + Expression filter = null; + if (request.getFilterExpression() != null) { + filter = request.getFilterExpression(); + } + return new QueryModelBuilder(table) + .projectColumns(projectColumns) + .filterExpression(filter) + .build(); + } + + /** + * create a failure response + */ + private QueryResponse createFailureResponse(QueryRequest request, Throwable throwable) { + return new QueryResponse(request.getRequestId(), Status.FAILURE.ordinal(), + throwable.getMessage(), new Object[0][]); + } + + /** + * create a success response with result rows + */ + private QueryResponse createSuccessResponse(QueryRequest request, List<CarbonRow> rows) { + Iterator<CarbonRow> itor = rows.iterator(); + Object[][] output = new Object[rows.size()][]; + int i = 0; + while (itor.hasNext()) { + output[i++] = itor.next().getData(); + } + return new QueryResponse(request.getRequestId(), Status.SUCCESS.ordinal(), "", output); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java new file mode 100644 index 0000000..9bcd397 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/Status.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +/** + * Status of RPC response + */ [email protected] +public enum Status { + SUCCESS, FAILURE +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java new file mode 100644 index 0000000..27dc38b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryRequest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; + +import org.apache.hadoop.io.Writable; + [email protected] +public class QueryRequest implements Serializable, Writable { + private int requestId; + private CarbonMultiBlockSplit split; + private TableInfo tableInfo; + private String[] projectColumns; + private Expression filterExpression; + private long limit; + + public QueryRequest() { + } + + public QueryRequest(int requestId, CarbonMultiBlockSplit split, + TableInfo tableInfo, String[] projectColumns, Expression filterExpression, long limit) { + this.requestId = requestId; + this.split = split; + this.tableInfo = tableInfo; + this.projectColumns = projectColumns; + this.filterExpression = filterExpression; + this.limit = limit; + } + + public int getRequestId() { + return requestId; + } + + public CarbonMultiBlockSplit getSplit() { + return split; + } + + public TableInfo getTableInfo() { + return tableInfo; + } + + public String[] getProjectColumns() { + return projectColumns; + } + + public Expression getFilterExpression() { + return filterExpression; + } + + public long getLimit() { + return limit; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(requestId); + split.write(out); + tableInfo.write(out); + out.writeInt(projectColumns.length); + for (String projectColumn : projectColumns) { + out.writeUTF(projectColumn); + } + String filter = ObjectSerializationUtil.convertObjectToString(filterExpression); + out.writeUTF(filter); + out.writeLong(limit); + } + + @Override + public void readFields(DataInput in) throws IOException { + requestId = in.readInt(); + split = new CarbonMultiBlockSplit(); + split.readFields(in); + tableInfo = new TableInfo(); + tableInfo.readFields(in); + projectColumns = new String[in.readInt()]; + for (int i = 0; i < projectColumns.length; i++) { + projectColumns[i] = in.readUTF(); + } + String filter = in.readUTF(); + filterExpression = (Expression) ObjectSerializationUtil.convertStringToObject(filter); + limit = in.readLong(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java new file mode 100644 index 0000000..033f1a5 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + [email protected] +public class QueryResponse implements Serializable, Writable { + private int queryId; + private int status; + private String message; + private Object[][] rows; + + public QueryResponse() { + } + + public QueryResponse(int queryId, int status, String message, Object[][] rows) { + this.queryId = queryId; + this.status = status; + this.message = message; + this.rows = rows; + } + + public int getQueryId() { + return queryId; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } + + public Object[][] getRows() { + return rows; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(queryId); + out.writeInt(status); + out.writeUTF(message); + WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows)); + } + + @Override + public void readFields(DataInput in) throws IOException { + queryId = in.readInt(); + status = in.readInt(); + message = in.readUTF(); + try { + rows = (Object[][])ObjectSerializationUtil.deserialize( + WritableUtils.readCompressedByteArray(in)); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java new file mode 100644 index 0000000..894948b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + [email protected] +public class RegisterWorkerRequest implements Serializable, Writable { + private String hostAddress; + private int port; + private int cores; + + public RegisterWorkerRequest() { + } + + public RegisterWorkerRequest(String hostAddress, int port, int cores) { + this.hostAddress = hostAddress; + this.port = port; + this.cores = cores; + } + + public String getHostAddress() { + return hostAddress; + } + + public int getPort() { + return port; + } + + public int getCores() { + return cores; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(hostAddress); + out.writeInt(port); + out.writeInt(cores); + } + + @Override + public void readFields(DataInput in) throws IOException { + hostAddress = in.readUTF(); + port = in.readInt(); + cores = in.readInt(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java new file mode 100644 index 0000000..8465c90 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + [email protected] +public class RegisterWorkerResponse implements Serializable, Writable { + + private String workerId; + + public RegisterWorkerResponse() { + } + + public RegisterWorkerResponse(String workerId) { + this.workerId = workerId; + } + + public String getWorkerId() { + return workerId; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(workerId); + } + + @Override + public void readFields(DataInput in) throws IOException { + workerId = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java new file mode 100644 index 0000000..7a25944 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + [email protected] +public class ShutdownRequest implements Serializable, Writable { + private String reason; + + public ShutdownRequest() { + } + + public ShutdownRequest(String reason) { + this.reason = reason; + } + + public String getReason() { + return reason; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(reason); + } + + @Override + public void readFields(DataInput in) throws IOException { + reason = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9b40bf9/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java new file mode 100644 index 0000000..f6f329f --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/ShutdownResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + [email protected] +public class ShutdownResponse implements Serializable, Writable { + private int status; + private String message; + + public ShutdownResponse() { + } + + public ShutdownResponse(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(status); + out.writeUTF(message); + } + + @Override + public void readFields(DataInput in) throws IOException { + status = in.readInt(); + message = in.readUTF(); + } +}
