Repository: carbondata Updated Branches: refs/heads/carbonstore 044a995aa -> 7306b59dd
[CARBONDATA-2688][CarbonStore] Support SQL in REST API Support SQL interface in Horizon service. Support REST client for SQL This closes #2481 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7306b59d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7306b59d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7306b59d Branch: refs/heads/carbonstore Commit: 7306b59dd87ab087e3e4ae9a4201629bc2a12dcb Parents: 044a995 Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Jul 10 21:20:45 2018 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Wed Jul 11 22:54:11 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/CarbonSession.scala | 101 +----------- .../apache/spark/sql/CarbonSessionBuilder.scala | 156 +++++++++++++++++++ pom.xml | 1 + .../carbondata/store/api/conf/StoreConf.java | 6 + store/horizon/pom.xml | 4 +- .../rest/client/impl/SimpleHorizonClient.java | 11 +- .../horizon/rest/controller/Horizon.java | 32 +++- .../rest/controller/HorizonController.java | 9 +- .../horizon/rest/model/view/Response.java | 8 +- .../horizon/rest/model/view/SelectResponse.java | 8 +- .../apache/carbondata/horizon/HorizonTest.java | 4 +- store/sql/pom.xml | 84 ++++++++++ .../horizon/rest/client/SqlHorizonClient.java | 57 +++++++ .../horizon/rest/controller/SqlHorizon.java | 82 ++++++++++ .../rest/controller/SqlHorizonController.java | 70 +++++++++ .../rest/model/validate/RequestValidator.java | 35 +++++ .../horizon/rest/model/view/SqlRequest.java | 33 ++++ .../horizon/rest/model/view/SqlResponse.java | 39 +++++ .../horizon/rest/sql/SparkSqlWrapper.scala | 34 ++++ .../java/org/apache/carbondata/AppTest.java | 20 +++ 20 files changed, 679 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index cd939dc..6c13955 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -265,14 +265,14 @@ object CarbonSession { private val statementId = new AtomicLong(0) - private var enableInMemCatlog: Boolean = false + private var enableInMemCatalog: Boolean = false private[sql] val threadStatementId = new ThreadLocal[Long]() implicit class CarbonBuilder(builder: Builder) { def enableInMemoryCatalog(): Builder = { - enableInMemCatlog = true + enableInMemCatalog = true builder } def getOrCreateCarbonSession(): SparkSession = { @@ -287,102 +287,7 @@ object CarbonSession { def getOrCreateCarbonSession(storePath: String, metaStorePath: String): SparkSession = synchronized { - if (!enableInMemCatlog) { - builder.enableHiveSupport() - } - val options = - getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]] - val userSuppliedContext: Option[SparkContext] = - getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]] - - if (metaStorePath != null) { - val hadoopConf = new Configuration() - val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") - if (configFile != null) { - hadoopConf.addResource(configFile) - } - if (options.get(CarbonCommonConstants.HIVE_CONNECTION_URL).isEmpty && - hadoopConf.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) { - val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" - options ++= Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL, - s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")) - } - } - - // Get the session from current thread's active session. - var session: SparkSession = SparkSession.getActiveSession match { - case Some(sparkSession: CarbonSession) => - if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) { - options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) } - sparkSession - } else { - null - } - case _ => null - } - if (session ne null) { - return session - } - - // Global synchronization so we will only set the default session once. - SparkSession.synchronized { - // If the current thread does not have an active session, get it from the global session. - session = SparkSession.getDefaultSession match { - case Some(sparkSession: CarbonSession) => - if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) { - options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) } - sparkSession - } else { - null - } - case _ => null - } - if (session ne null) { - return session - } - - // No active nor global default session. Create a new one. - val sparkContext = userSuppliedContext.getOrElse { - // set app name if not given - val randomAppName = java.util.UUID.randomUUID().toString - val sparkConf = new SparkConf() - options.foreach { case (k, v) => sparkConf.set(k, v) } - if (!sparkConf.contains("spark.app.name")) { - sparkConf.setAppName(randomAppName) - } - val sc = SparkContext.getOrCreate(sparkConf) - CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration) - // maybe this is an existing SparkContext, update its SparkConf which maybe used - // by SparkSession - options.foreach { case (k, v) => sc.conf.set(k, v) } - if (!sc.conf.contains("spark.app.name")) { - sc.conf.setAppName(randomAppName) - } - sc - } - - session = new CarbonSession(sparkContext, None, !enableInMemCatlog) - val carbonProperties = CarbonProperties.getInstance() - if (storePath != null) { - carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) - // In case if it is in carbon.properties for backward compatible - } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { - carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, - session.sessionState.conf.warehousePath) - } - options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } - SparkSession.setDefaultSession(session) - // Setup monitor end point and register CarbonMonitorListener - Profiler.initialize(sparkContext) - // Register a successfully instantiated context to the singleton. This should be at the - // end of the class definition so that the singleton is updated only if there is no - // exception in the construction of the instance. - CarbonToSparkAdapater.addSparkListener(sparkContext) - session.streams.addListener(new CarbonStreamingQueryListener(session)) - } - - session + new CarbonSessionBuilder(builder).build(storePath, metaStorePath, enableInMemCatalog) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala new file mode 100644 index 0000000..6deade1 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSessionBuilder.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.SparkSession.Builder +import org.apache.spark.sql.profiler.Profiler +import org.apache.spark.util.Utils + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.streaming.CarbonStreamingQueryListener + +/** + * Used for building CarbonSession, it can be called in Scala or Java + * @param builder SparkSession builder + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +class CarbonSessionBuilder(builder: Builder) { + + def build( + storePath: String, + metaStorePath: String, + enableInMemCatalog: Boolean): SparkSession = synchronized { + + if (!enableInMemCatalog) { + builder.enableHiveSupport() + } + val options = + getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]] + val userSuppliedContext: Option[SparkContext] = + getValue("userSuppliedContext", builder).asInstanceOf[Option[SparkContext]] + + if (metaStorePath != null) { + val hadoopConf = new Configuration() + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + hadoopConf.addResource(configFile) + } + if (options.get(CarbonCommonConstants.HIVE_CONNECTION_URL).isEmpty && + hadoopConf.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) { + val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath + val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" + options ++= Map[String, String]((CarbonCommonConstants.HIVE_CONNECTION_URL, + s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")) + } + } + + // Get the session from current thread's active session. + var session: SparkSession = SparkSession.getActiveSession match { + case Some(sparkSession: CarbonSession) => + if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) { + options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) } + sparkSession + } else { + null + } + case _ => null + } + if (session ne null) { + return session + } + + // Global synchronization so we will only set the default session once. + SparkSession.synchronized { + // If the current thread does not have an active session, get it from the global session. + session = SparkSession.getDefaultSession match { + case Some(sparkSession: CarbonSession) => + if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) { + options.foreach { case (k, v) => sparkSession.sessionState.conf.setConfString(k, v) } + sparkSession + } else { + null + } + case _ => null + } + if (session ne null) { + return session + } + + // No active nor global default session. Create a new one. + val sparkContext = userSuppliedContext.getOrElse { + // set app name if not given + val randomAppName = java.util.UUID.randomUUID().toString + val sparkConf = new SparkConf() + options.foreach { case (k, v) => sparkConf.set(k, v) } + if (!sparkConf.contains("spark.app.name")) { + sparkConf.setAppName(randomAppName) + } + val sc = SparkContext.getOrCreate(sparkConf) + CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration) + // maybe this is an existing SparkContext, update its SparkConf which maybe used + // by SparkSession + options.foreach { case (k, v) => sc.conf.set(k, v) } + if (!sc.conf.contains("spark.app.name")) { + sc.conf.setAppName(randomAppName) + } + sc + } + + session = new CarbonSession(sparkContext, None, !enableInMemCatalog) + val carbonProperties = CarbonProperties.getInstance() + if (storePath != null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) + // In case if it is in carbon.properties for backward compatible + } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) { + carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, + session.sessionState.conf.warehousePath) + } + options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } + SparkSession.setDefaultSession(session) + // Setup monitor end point and register CarbonMonitorListener + Profiler.initialize(sparkContext) + // Register a successfully instantiated context to the singleton. This should be at the + // end of the class definition so that the singleton is updated only if there is no + // exception in the construction of the instance. + CarbonToSparkAdapater.addSparkListener(sparkContext) + session.streams.addListener(new CarbonStreamingQueryListener(session)) + } + session + } + + /** + * It is a hack to get the private field from class. + */ + def getValue(name: String, builder: Builder): Any = { + val currentMirror = scala.reflect.runtime.currentMirror + val instanceMirror = currentMirror.reflect(builder) + val m = currentMirror.classSymbol(builder.getClass). + toType.members.find { p => + p.name.toString.equals(name) + }.get.asTerm + instanceMirror.reflectField(m).get + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 82b9b7b..d6436d7 100644 --- a/pom.xml +++ b/pom.xml @@ -587,6 +587,7 @@ <id>horizon</id> <modules> <module>store/horizon</module> + <module>store/sql</module> </modules> </profile> <profile> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java index 98a670a..5e4bb4a 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java +++ b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java @@ -50,9 +50,15 @@ public class StoreConf implements Serializable, Writable { public static final String STORE_LOCATION = "carbon.store.location"; public static final String STORE_NAME = "carbon.store.name"; + public static final String STORE_CONF_FILE = "carbon.store.confFile"; + private Map<String, String> conf = new HashMap<>(); public StoreConf() { + String storeConfFile = System.getProperty(STORE_CONF_FILE); + if (storeConfFile != null) { + load(storeConfFile); + } } public StoreConf(String storeName, String storeLocation) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/pom.xml ---------------------------------------------------------------------- diff --git a/store/horizon/pom.xml b/store/horizon/pom.xml index 7ae848a..16a92e9 100644 --- a/store/horizon/pom.xml +++ b/store/horizon/pom.xml @@ -16,7 +16,7 @@ <properties> <dev.path>${basedir}/../../dev</dev.path> - <spring.version>2.0.2.RELEASE</spring.version> + <spring.version>1.5.14.RELEASE</spring.version> </properties> <dependencies> @@ -66,7 +66,7 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>2.9.5</version> + <version>2.6.5</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java index ba86180..076df70 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.horizon.rest.client.HorizonClient; import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest; @@ -34,10 +36,15 @@ import org.apache.carbondata.store.api.exception.StoreException; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; +/** + * REST Client implementation for Horizon service + */ +@InterfaceAudience.User +@InterfaceStability.Unstable public class SimpleHorizonClient implements HorizonClient { - private RestTemplate restTemplate; - private String serviceUri; + protected RestTemplate restTemplate; + protected String serviceUri; public SimpleHorizonClient(String serviceUri) { this.serviceUri = serviceUri; http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java index d7b7c5e..9d876ab 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java @@ -17,6 +17,11 @@ package org.apache.carbondata.horizon.rest.controller; +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.store.api.conf.StoreConf; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @@ -27,11 +32,32 @@ public class Horizon { private static ConfigurableApplicationContext context; public static void main(String[] args) { - start(args); + String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE); + if (storeConfFile == null) { + storeConfFile = getStoreConfFile(); + } + start(storeConfFile); + } + + static String getStoreConfFile() { + try { + return new File(".").getCanonicalPath() + "/store/conf/store.conf"; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void start(String storeConfFile) { + start(Horizon.class, storeConfFile); } - public static void start(String[] args) { - context = SpringApplication.run(Horizon.class, args); + static <T> void start(final Class<T> classTag, String storeConfFile) { + System.setProperty("carbonstore.conf.file", storeConfFile); + new Thread() { + public void run() { + context = SpringApplication.run(classTag); + } + }.start(); } public static void stop() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java index 33ea50b..4862483 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java @@ -46,6 +46,7 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @@ -61,6 +62,11 @@ public class HorizonController { store = CarbonStoreFactory.getDistributedStore("GlobalStore", new StoreConf(storeFile)); } + @RequestMapping(value = "echo") + public ResponseEntity<String> echo(@RequestParam(name = "name") String name) { + return new ResponseEntity<>(String.valueOf("Hello " + name), HttpStatus.OK); + } + @RequestMapping(value = "/table/create", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<String> createTable( @RequestBody CreateTableRequest request) throws StoreException, IOException { @@ -110,7 +116,8 @@ public class HorizonController { request.getDatabaseName() + "." + request.getTableName() + ", take time: " + (end - start) + " ms"); - return new ResponseEntity<>(new SelectResponse(request, output), HttpStatus.OK); + return new ResponseEntity<>( + new SelectResponse(request, "SUCCESS", output), HttpStatus.OK); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/Response.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/Response.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/Response.java index c114af3..34f1e28 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/Response.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/Response.java @@ -19,15 +19,21 @@ package org.apache.carbondata.horizon.rest.model.view; public class Response { private long responseId; + private String message; public Response() { } - Response(Request request) { + Response(Request request, String message) { this.responseId = request.getRequestId(); + this.message = message; } public long getResponseId() { return responseId; } + + public String getMessage() { + return message; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java index a0f1b55..6bf5c75 100644 --- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java @@ -24,12 +24,8 @@ public class SelectResponse extends Response { public SelectResponse() { } - public SelectResponse(SelectRequest request) { - super(request); - } - - public SelectResponse(SelectRequest request, Object[][] rows) { - super(request); + public SelectResponse(SelectRequest request, String message, Object[][] rows) { + super(request, message); this.rows = rows; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java index 8adfd38..91a9dba 100644 --- a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java +++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java @@ -55,7 +55,7 @@ public class HorizonTest { projectFolder = new File(HorizonTest.class.getResource("/").getPath() + "../../../../").getCanonicalPath(); String log4jFile = projectFolder + "/store/conf/log4j.properties"; - String confFile = projectFolder + "/store/conf/store.conf"; + final String confFile = projectFolder + "/store/conf/store.conf"; System.setProperty("log.path", projectFolder + "/store/core/target/master_worker.log"); System.setProperty("carbonstore.conf.file", confFile); @@ -68,7 +68,7 @@ public class HorizonTest { new Thread() { public void run() { - Horizon.start(new String[0]); + Horizon.start(confFile); } }.start(); Thread.sleep(10000); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/pom.xml ---------------------------------------------------------------------- diff --git a/store/sql/pom.xml b/store/sql/pom.xml new file mode 100644 index 0000000..411590b --- /dev/null +++ b/store/sql/pom.xml @@ -0,0 +1,84 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.5.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-horizon-sql</artifactId> + <name>Apache CarbonData :: Horizon SQL</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-horizon</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-lucene</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/client/SqlHorizonClient.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/client/SqlHorizonClient.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/client/SqlHorizonClient.java new file mode 100644 index 0000000..fc46a17 --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/client/SqlHorizonClient.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.horizon.rest.client.impl.SimpleHorizonClient; +import org.apache.carbondata.horizon.rest.model.view.SqlResponse; + +import org.springframework.http.ResponseEntity; + +/** + * REST Client to send SQL statement to Horizon service + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public class SqlHorizonClient extends SimpleHorizonClient { + + public SqlHorizonClient(String serviceUri) { + super(serviceUri); + } + + @Override + public List<CarbonRow> sql(String sqlString) throws IOException { + Objects.requireNonNull(sqlString); + ResponseEntity<SqlResponse> response = + restTemplate.postForEntity(serviceUri + "/table/sql", sqlString, SqlResponse.class); + Object[][] rows = Objects.requireNonNull(response.getBody()).getRows(); + List<CarbonRow> output = new ArrayList<>(rows.length); + for (Object[] row : rows) { + output.add(new CarbonRow(row)); + } + return output; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java new file mode 100644 index 0000000..1812a50 --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.controller; + +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.store.api.conf.StoreConf; + +import org.apache.spark.sql.CarbonSessionBuilder; +import org.apache.spark.sql.SparkSession; + +public class SqlHorizon extends Horizon { + + static SparkSession session; + + private static void createSession(String[] args) throws IOException { + String rootPath = new File(SqlHorizon.class.getResource("/").getPath() + + "../../../..").getCanonicalPath(); + String storeLocation = rootPath + "/examples/spark2/target/store"; + String warehouse = rootPath + "/examples/spark2/target/warehouse"; + String metastoredb = rootPath + "/examples/spark2/target"; + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, ""); + + int workThreadNum = 2; + String masterUrl = "local[" + workThreadNum + "]"; + + SparkSession.Builder baseBuilder = SparkSession + .builder() + .master(masterUrl) + .appName("Horizon-SQL") + .config("spark.ui.port", 9000) + .config("spark.sql.warehouse.dir", warehouse) + .config("spark.driver.host", "localhost") + .config("spark.sql.crossJoin.enabled", "true"); + session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, metastoredb, true); + } + + static SparkSession getSession() { + return session; + } + + public static void main(String[] args) { + // Start Spring + String storeConfFile = System.getProperty(StoreConf.STORE_CONF_FILE); + if (storeConfFile == null) { + storeConfFile = getStoreConfFile(); + } + start(SqlHorizon.class, storeConfFile); + + try { + // Start CarbonSession + Thread.sleep(3000); + createSession(args); + Thread.sleep(Long.MAX_VALUE); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java new file mode 100644 index 0000000..7583a14 --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.controller; + +import java.util.List; + +import org.apache.carbondata.horizon.rest.model.validate.RequestValidator; +import org.apache.carbondata.horizon.rest.model.view.SqlRequest; +import org.apache.carbondata.horizon.rest.model.view.SqlResponse; +import org.apache.carbondata.horizon.rest.sql.SparkSqlWrapper; +import org.apache.carbondata.store.api.exception.StoreException; + +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Row; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class SqlHorizonController { + + @RequestMapping(value = "/table/sql", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws StoreException { + RequestValidator.validateSql(request); + List<Row> rows; + try { + rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) + .collectAsList(); + } catch (AnalysisException e) { + throw new StoreException(e.getSimpleMessage()); + } catch (Exception e) { + throw new StoreException(e.getMessage()); + } + Object[][] result = new Object[rows.size()][]; + for (int i = 0; i < rows.size(); i++) { + Row row = rows.get(i); + result[i] = new Object[row.size()]; + for (int j = 0; j < row.size(); j++) { + result[i][j] = row.get(j); + } + } + + return new ResponseEntity<>( + new SqlResponse(request, "SUCCESS", result), HttpStatus.OK); + } + + @RequestMapping(value = "echosql") + public ResponseEntity<String> echosql(@RequestParam(name = "name") String name) { + return new ResponseEntity<>(String.valueOf("Welcome to SQL, " + name), HttpStatus.OK); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java new file mode 100644 index 0000000..82e095a --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.validate; + +import org.apache.carbondata.horizon.rest.model.view.SqlRequest; +import org.apache.carbondata.store.api.exception.StoreException; + +import org.apache.commons.lang.StringUtils; + +public class RequestValidator { + + public static void validateSql(SqlRequest request) throws StoreException { + if (request == null) { + throw new StoreException("Select should not be null"); + } + if (StringUtils.isEmpty(request.getSqlStatement())) { + throw new StoreException("sql statement is invalid"); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlRequest.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlRequest.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlRequest.java new file mode 100644 index 0000000..d0cbb1b --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlRequest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.view; + +public class SqlRequest extends Request { + private String sqlStatement; + + public SqlRequest() { + } + + public SqlRequest(String sqlStatement) { + this.sqlStatement = sqlStatement; + } + + public String getSqlStatement() { + return sqlStatement; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlResponse.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlResponse.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlResponse.java new file mode 100644 index 0000000..42d17b3 --- /dev/null +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/model/view/SqlResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.model.view; + +public class SqlResponse extends Response { + + private Object[][] rows; + + public SqlResponse() { + } + + public SqlResponse(Request request, String message, Object[][] rows) { + super(request, message); + this.rows = rows; + } + + public Object[][] getRows() { + return rows; + } + + public void setRows(Object[][] rows) { + this.rows = rows; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/main/scala/org/apache/carbondata/horizon/rest/sql/SparkSqlWrapper.scala ---------------------------------------------------------------------- diff --git a/store/sql/src/main/scala/org/apache/carbondata/horizon/rest/sql/SparkSqlWrapper.scala b/store/sql/src/main/scala/org/apache/carbondata/horizon/rest/sql/SparkSqlWrapper.scala new file mode 100644 index 0000000..45ce0d9 --- /dev/null +++ b/store/sql/src/main/scala/org/apache/carbondata/horizon/rest/sql/SparkSqlWrapper.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.rest.sql + +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} + +/** + * We need to catch the exceptions that spark throws. + * But native spark does not annotate the exception explicitly, so we need to add + * this class. + */ +object SparkSqlWrapper { + + @throws[AnalysisException] + def sql(session: SparkSession, sqlStatement: String): DataFrame = { + session.sql(sqlStatement) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7306b59d/store/sql/src/test/java/org/apache/carbondata/AppTest.java ---------------------------------------------------------------------- diff --git a/store/sql/src/test/java/org/apache/carbondata/AppTest.java b/store/sql/src/test/java/org/apache/carbondata/AppTest.java new file mode 100644 index 0000000..ace9446 --- /dev/null +++ b/store/sql/src/test/java/org/apache/carbondata/AppTest.java @@ -0,0 +1,20 @@ +package org.apache.carbondata; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() + { + assertTrue( true ); + } +}