Repository: carbondata Updated Branches: refs/heads/carbonstore 9ac55a5a6 -> 7ad2fd951
[CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppelin support This closes #2522 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7ad2fd95 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7ad2fd95 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7ad2fd95 Branch: refs/heads/carbonstore Commit: 7ad2fd951126df0873e6610eb0a63935a0aa4789 Parents: 9ac55a5 Author: Ajith <[email protected]> Authored: Wed Jul 18 16:48:54 2018 +0530 Committer: QiangCai <[email protected]> Committed: Mon Jul 23 10:09:55 2018 +0800 ---------------------------------------------------------------------- integration/zeppelin/README.md | 44 +++++ integration/zeppelin/assembly/assembly.xml | 37 ++++ .../zeppelin/misc/interpreter-setting.json | 22 +++ integration/zeppelin/pom.xml | 99 ++++++++++ .../carbondata/zeppelin/CarbonInterpreter.java | 186 +++++++++++++++++++ .../zeppelin/response/CarbonResponse.java | 144 ++++++++++++++ .../zeppelin/TestCarbonInterpreter.java | 105 +++++++++++ .../carbondata/zeppelin/TestCarbonResponse.java | 94 ++++++++++ pom.xml | 6 + store/sql/pom.xml | 4 +- .../rest/controller/SqlHorizonController.java | 26 ++- 11 files changed, 755 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/README.md ---------------------------------------------------------------------- diff --git a/integration/zeppelin/README.md b/integration/zeppelin/README.md new file mode 100644 index 0000000..35b27bc --- /dev/null +++ b/integration/zeppelin/README.md @@ -0,0 +1,44 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +### Please follow below steps to integrate with zeppelin +1. run ```mvn package -Pzeppelin``` + This will generate _carbondata-zeppelin-*.tar.gz_ under target folder +2. Extract the tar content to _ZEPPELIN_INSTALL_HOME/interpreter/_ +3. Add _org.apache.carbonndata.zeppelin.CarbonInterpreter_ to list of interpreters mentioned by _zeppelin.interpreters_ @ _ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml_ (create if not exists) + Example: +```xml + <property> + <name>zeppelin.interpreters</name> +<value>org.apache.zeppelin.spark.SparkInterpreter,.....,org.apache.carbonndata.zeppelin.CarbonInterpreter</value> + <description>Comma separated interpreter configurations. First interpreter become a default</description> + </property> +``` +4. Add carbon to list of interpreters mentioned by zeppelin.interpreter.order @ ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml + Example: +```xml + <property> + <name>zeppelin.interpreter.group.order</name> + <value>spark,..,carbon</value> + <description></description> + </property> +``` + +5. Restart Zeppelin server and add new interpreter with name _carbon_ from zeppelin interpreter page + Refer : https://zeppelin.apache.org/docs/0.8.0/usage/interpreter/overview.html#what-is-zeppelin-interpreter +6. Configure ```carbon.query.api.url``` in interpreter setting from zeppelin interpreter page and click save +7. Now can use notebook with interpreter ```%carbon``` http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/assembly/assembly.xml ---------------------------------------------------------------------- diff --git a/integration/zeppelin/assembly/assembly.xml b/integration/zeppelin/assembly/assembly.xml new file mode 100644 index 0000000..6d09ad3 --- /dev/null +++ b/integration/zeppelin/assembly/assembly.xml @@ -0,0 +1,37 @@ +<assembly> +<id>compress</id> + <formats> + <format>tar.gz</format> + </formats> + + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/carbon</outputDirectory> + <useTransitiveDependencies>false</useTransitiveDependencies> + <includes> + <include>com.fasterxml.jackson.core:*:jar</include> + </includes> + </dependencySet> + </dependencySets> + + <fileSets> + + <fileSet> + <directory>misc/</directory> + <outputDirectory>carbon</outputDirectory> + <includes> + <include>*.json</include> + </includes> + </fileSet> + + <fileSet> + <directory>target</directory> + <outputDirectory>carbon</outputDirectory> + <includes> + <include>carbondata-zeppelin-*.jar</include> + </includes> + </fileSet> + + </fileSets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/misc/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/integration/zeppelin/misc/interpreter-setting.json b/integration/zeppelin/misc/interpreter-setting.json new file mode 100644 index 0000000..ce39804 --- /dev/null +++ b/integration/zeppelin/misc/interpreter-setting.json @@ -0,0 +1,22 @@ +[ + { + "group": "carbon", + "name": "carbon", + "className": "org.apache.carbondata.zeppelin.CarbonInterpreter", + "properties": { + "carbon.query.api.url": { + "envName": null, + "propertyName": "carbon.query.api.url", + "defaultValue": "", + "description": "API URL for request", + "type": "string" + } + }, + "editor": { + "language": "sql", + "editOnDblClick": false, + "completionKey": "TAB", + "completionSupport": true + } + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/pom.xml ---------------------------------------------------------------------- diff --git a/integration/zeppelin/pom.xml b/integration/zeppelin/pom.xml new file mode 100644 index 0000000..c381c48 --- /dev/null +++ b/integration/zeppelin/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.5.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-zeppelin</artifactId> + <name>Apache CarbonData :: Zeppelin</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>0.8.0</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.1</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>2.8.0</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.8.11.1</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.8.10</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>assembly/assembly.xml</descriptor> + <finalName>carbondata-zeppelin-${version}</finalName> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java ---------------------------------------------------------------------- diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java new file mode 100644 index 0000000..f5dfa0e --- /dev/null +++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.zeppelin; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.carbondata.zeppelin.response.CarbonResponse; + +import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Carbon based interpreter for zeppelin + */ +public class CarbonInterpreter extends Interpreter { + + public static final Logger logger = LoggerFactory.getLogger(CarbonInterpreter.class); + + static final char TAB = '\t'; + + static final String LF = "\n"; + + /** + * Property which can be set in zeppelin to carbon REST API server + */ + public static final String CARBON_QUERY_API_URL = "carbon.query.api.url"; + + /** + * These are the queries which need Table like output format + */ + private static final String[] SEARCH_QUERIES = {"select", "list", "show", "desc"}; + + public CarbonInterpreter(Properties properties) { + super(properties); + } + + @Override + public void open() throws InterpreterException { + } + + @Override + public void close() throws InterpreterException { + } + + @Override + public void cancel(InterpreterContext interpreterContext) throws InterpreterException { + } + + @Override + public int getProgress(InterpreterContext interpreterContext) throws InterpreterException { + return 0; + } + + @Override + public FormType getFormType() throws InterpreterException { + return FormType.SIMPLE; + } + + @Override + public InterpreterResult interpret(String sql, InterpreterContext interpreterContext) + throws InterpreterException { + try { + return executeQuery.apply(sql); + } catch (RuntimeException e) { + logger.error("failed to query data in carbon ", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + } + + /** + * This will execute the given sql Query by sending a post request on CARBON_QUERY_API_URL + */ + private Function<String, HttpResponse> doPost = sql -> { + // prepare the post body + String postContent = new StringBuilder("{\"sqlStatement\":") + .append("\"").append(sql).append("\"" + "}").toString(); + logger.debug("post:" + postContent); + + // prepare entity and set content type + StringEntity entity = new StringEntity(postContent, "UTF-8"); + entity.setContentType("application/json; charset=UTF-8"); + + // get the POST url from interpreter property + String postURL = getProperty(CARBON_QUERY_API_URL); + logger.debug("post url:" + postURL); + + // do POST and get response + HttpPost postRequest = new HttpPost(postURL); + postRequest.setEntity(entity); + HttpClient httpClient = HttpClientBuilder.create().build(); + try { + return httpClient.execute(postRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + /** + * Check if output has to be sent as a able to zeppelin + */ + private Function<String, Boolean> isTableFormatOutput = sql -> + (StringUtils.startsWithAny(sql, SEARCH_QUERIES)); + + /** + * returns InterpreterResult from CarbonResponse + */ + private BiFunction<String, CarbonResponse, InterpreterResult> getResult = (sql, response) -> { + if (isTableFormatOutput.apply(sql.toLowerCase().trim())) { + //format only select queries and return as table + String formattedResult = Arrays + .stream(response.getRows()) + .filter(Objects::nonNull) + .map(row -> StringUtils.join(row, TAB)) + .collect(Collectors.joining(LF)); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TABLE, + formattedResult); + } else { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TEXT, + response.getMessage()); + } + }; + + /** + * Executes the given sql and return formatted result + */ + private Function<String, InterpreterResult> executeQuery = sql -> { + try { + HttpResponse response = doPost.apply(sql); + // always close the content after reading fully to release connection + // IOUtils.toString will completely read the content + try (InputStream content = response.getEntity().getContent()) { + Optional<CarbonResponse> carbonResponse = CarbonResponse.parse(content); + int code = response.getStatusLine().getStatusCode(); + if (code != 200) { + StringBuilder errorMessage = new StringBuilder("Failed : HTTP error code " + code + " ."); + carbonResponse.ifPresent(rsp -> { + logger.error("Failed to execute query: " + rsp.getFullResponse()); + errorMessage.append(rsp.getMessage()); + }); + return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage.toString()); + } else { + return carbonResponse.map(rsp -> getResult.apply(sql, rsp)) + .orElseGet(() -> new InterpreterResult(InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, + "Query Success, but unable to parse response")); + } + } + } catch (IOException e) { + logger.error("Error executing query ", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + }; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java ---------------------------------------------------------------------- diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java new file mode 100644 index 0000000..2428d75 --- /dev/null +++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.zeppelin.response; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.commons.io.IOUtils; + + + +/** + * acts as a response object from carbon horizon server + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class CarbonResponse { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private String responseId; + + private String message; + + private Object[][] rows; + + long timestamp; + + int status; + + String error; + + String exception; + + String path; + + String fullResponse; + + + public String getResponseId() { + return responseId; + } + + public void setResponseId(String responseId) { + this.responseId = responseId; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Object[][] getRows() { + return rows; + } + + public void setRows(Object[][] rows) { + this.rows = rows; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getFullResponse() { + return fullResponse; + } + + public void setFullResponse(String fullResponse) { + this.fullResponse = fullResponse; + } + + /** + * Reads the input stream for JSON and return a CarbonResponse instance + * PS: Caller responsible for closing the stream + * + * @param inputStream + * @return + * @throws IOException + */ + public static Optional<CarbonResponse> parse(InputStream inputStream) throws IOException { + String plainTextResponse = IOUtils.toString(inputStream, "UTF-8"); + CarbonResponse response = mapper.readValue(plainTextResponse, CarbonResponse.class); + response.setFullResponse(plainTextResponse); + return Optional.of(response); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java ---------------------------------------------------------------------- diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java new file mode 100644 index 0000000..6cb47b4 --- /dev/null +++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.zeppelin; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class TestCarbonInterpreter { + + static HttpServer server = null; + + @BeforeClass + public static void setup() throws Exception { + server = HttpServer.create(new InetSocketAddress("localhost", 8123), 0); + server.createContext("/table/sql", new FakePostHandler()); + server.start(); + } + + @AfterClass + public static void tearDown() { + Optional.of(server).ifPresent(serverSocket -> serverSocket.stop(0)); + } + + @Test(timeout = 5000) + public void testInterpreterSelectSuccessResponse() throws InterpreterException { + Properties properties = new Properties(); + properties.put(CarbonInterpreter.CARBON_QUERY_API_URL, "http://localhost:8123/table/sql"); + CarbonInterpreter interpreter = new CarbonInterpreter(properties); + InterpreterResult result = interpreter.interpret("show tables", null); + String expectedFormattedResult = "{\"code\":\"SUCCESS\",\"msg\":[{\"type\":\"TABLE\",\"data\"" + + ":\"database\\ttableName\\tisTemporary\\ndefault\\tsinka6\\tfalse\\ndefault\\tsinka7\\tfalse\"}]}"; + assertEquals(expectedFormattedResult, result.toJson()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + } +} + +class FakePostHandler implements HttpHandler { + + @Override + public void handle(HttpExchange he) throws IOException { + InputStreamReader isr = new InputStreamReader(he.getRequestBody(), "utf-8"); + BufferedReader br = new BufferedReader(isr); + String query = br.readLine(); + String response = ""; + if (query.equals("{\"sqlStatement\":\"show tables\"}")) { + response = "{\n" + + " \"responseId\": 19435528129427470,\n" + + " \"message\": \"SUCCESS\",\n" + + " \"rows\": [\n" + + " [\n" + + " \"database\",\n" + + " \"tableName\",\n" + + " \"isTemporary\"\n" + + " ],\n" + + " [\n" + + " \"default\",\n" + + " \"sinka6\",\n" + + " false\n" + + " ],\n" + + " [\n" + + " \"default\",\n" + + " \"sinka7\",\n" + + " false\n" + + " ]\n" + + " ]\n" + + "}"; + } + he.sendResponseHeaders(200, response.length()); + OutputStream os = he.getResponseBody(); + os.write(response.toString().getBytes()); + os.close(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java ---------------------------------------------------------------------- diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java new file mode 100644 index 0000000..4b169f4 --- /dev/null +++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.zeppelin; + +import com.fasterxml.jackson.databind.JsonMappingException; +import org.apache.carbondata.zeppelin.response.CarbonResponse; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestCarbonResponse { + + @Test(expected = JsonMappingException.class) + public void testBodyIsEmpty() throws IOException { + String input = ""; + CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())); + } + + @Test + public void testSuccessResponse() throws IOException { + String input = "{\n" + + " \"responseId\": 19435528129427470,\n" + + " \"message\": \"SUCCESS\",\n" + + " \"rows\": [\n" + + " [\n" + + " \"database\",\n" + + " \"tableName\",\n" + + " \"isTemporary\"\n" + + " ],\n" + + " [\n" + + " \"default\",\n" + + " \"sinka6\",\n" + + " false\n" + + " ],\n" + + " [\n" + + " \"default\",\n" + + " \"sinka7\",\n" + + " false\n" + + " ]\n" + + " ]\n" + + "}"; + Object[][] expectedResponse = new Object[3][]; + expectedResponse[0] = new Object[]{"database", "tableName", "isTemporary"}; + expectedResponse[1] = new Object[]{"default", "sinka6", false}; + expectedResponse[2] = new Object[]{"default", "sinka7", false}; + CarbonResponse successResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get(); + assertEquals("SUCCESS", successResponse.getMessage()); + assertEquals("19435528129427470", successResponse.getResponseId()); + assertTrue(Arrays.deepEquals(expectedResponse, successResponse.getRows())); + assertEquals(input, successResponse.getFullResponse()); + } + + @Test + public void testErrorResponse() throws IOException { + String input = "{\n" + + " \"timestamp\": 1531884083849,\n" + + " \"status\": 500,\n" + + " \"error\": \"Internal Server Error\",\n" + + " \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n" + + " \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " + + "Table or view 'sinka6' already exists in database 'default';\",\n" + + " \"path\": \"/table/sql\"\n" + + "}"; + CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get(); + assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " + + "Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage()); + assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException()); + assertEquals(1531884083849L, errorResponse.getTimestamp()); + assertEquals("Internal Server Error", errorResponse.getError()); + assertEquals(500, errorResponse.getStatus()); + assertEquals("/table/sql", errorResponse.getPath()); + assertEquals(input, errorResponse.getFullResponse()); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f1f51f0..2135b9d 100644 --- a/pom.xml +++ b/pom.xml @@ -646,6 +646,12 @@ <module>datamap/mv/core</module> </modules> </profile> + <profile> + <id>zeppelin</id> + <modules> + <module>integration/zeppelin</module> + </modules> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/store/sql/pom.xml ---------------------------------------------------------------------- diff --git a/store/sql/pom.xml b/store/sql/pom.xml index 411590b..d90ebb3 100644 --- a/store/sql/pom.xml +++ b/store/sql/pom.xml @@ -48,8 +48,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ad2fd95/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java ---------------------------------------------------------------------- diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java index 7583a14..da9df52 100644 --- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java +++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java @@ -18,6 +18,7 @@ package org.apache.carbondata.horizon.rest.controller; import java.util.List; +import java.util.stream.IntStream; import org.apache.carbondata.horizon.rest.model.validate.RequestValidator; import org.apache.carbondata.horizon.rest.model.view.SqlRequest; @@ -26,6 +27,7 @@ import org.apache.carbondata.horizon.rest.sql.SparkSqlWrapper; import org.apache.carbondata.store.api.exception.StoreException; import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -42,25 +44,29 @@ public class SqlHorizonController { public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws StoreException { RequestValidator.validateSql(request); List<Row> rows; + Dataset<Row> sqlDataFrame = null; try { - rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) - .collectAsList(); + sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(), + request.getSqlStatement()); + rows = sqlDataFrame.collectAsList(); } catch (AnalysisException e) { throw new StoreException(e.getSimpleMessage()); } catch (Exception e) { throw new StoreException(e.getMessage()); } - Object[][] result = new Object[rows.size()][]; - for (int i = 0; i < rows.size(); i++) { - Row row = rows.get(i); - result[i] = new Object[row.size()]; - for (int j = 0; j < row.size(); j++) { - result[i][j] = row.get(j); - } + final String[] fieldNames = sqlDataFrame.schema().fieldNames(); + Object[][] responseData = new Object[0][]; + if (rows.size() > 0) { + final Object[][] result = new Object[rows.size() + 1][fieldNames.length]; + System.arraycopy(fieldNames, 0, result[0], 0, fieldNames.length); + IntStream.range(0, rows.size()).forEach(index -> + IntStream.range(0, fieldNames.length).forEach(col -> + result[index + 1][col] = rows.get(index).get(col))); + responseData = result; } return new ResponseEntity<>( - new SqlResponse(request, "SUCCESS", result), HttpStatus.OK); + new SqlResponse(request, "SUCCESS", responseData), HttpStatus.OK); } @RequestMapping(value = "echosql")
