Repository: incubator-zeppelin Updated Branches: refs/heads/master 7c050b5f7 -> db418bbe2
ZEPPELIN-171,Add Apache Kylin interpreter Leverage Zeppelin to interactive with Kylin Author: jiazhong <[email protected]> Closes #207 from janzhongi/master and squashes the following commits: a0abcbe [jiazhong] ZEPPELIN-171 Add [Apache Kylin] Intepreter Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/db418bbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/db418bbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/db418bbe Branch: refs/heads/master Commit: db418bbe215d2569075b093640f25e8cb63086d4 Parents: 7c050b5 Author: jiazhong <[email protected]> Authored: Thu Jul 9 13:18:29 2015 +0800 Committer: Lee moon soo <[email protected]> Committed: Thu Aug 27 23:25:57 2015 -0700 ---------------------------------------------------------------------- conf/zeppelin-site.xml.template | 2 +- kylin/pom.xml | 130 ++++++++++++ .../apache/zeppelin/kylin/KylinInterpreter.java | 201 +++++++++++++++++++ kylin/src/test/java/KylinInterpreterTest.java | 201 +++++++++++++++++++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 6 files changed, 536 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template old mode 100644 new mode 100755 index ad90f88..9098947 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -85,7 +85,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/kylin/pom.xml ---------------------------------------------------------------------- diff --git a/kylin/pom.xml b/kylin/pom.xml new file mode 100755 index 0000000..9e5311d --- /dev/null +++ b/kylin/pom.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-kylin</artifactId> + <packaging>jar</packaging> + <version>0.6.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Kylin interpreter</name> + <url>http://zeppelin.incubator.apache.org</url> + + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.6</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.5</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/kylin</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/kylin</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/kylin/src/main/java/org/apache/zeppelin/kylin/KylinInterpreter.java ---------------------------------------------------------------------- diff --git a/kylin/src/main/java/org/apache/zeppelin/kylin/KylinInterpreter.java b/kylin/src/main/java/org/apache/zeppelin/kylin/KylinInterpreter.java new file mode 100755 index 0000000..e7bba38 --- /dev/null +++ b/kylin/src/main/java/org/apache/zeppelin/kylin/KylinInterpreter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.kylin; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Kylin interpreter for Zeppelin. (http://kylin.io) + */ +public class KylinInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(KylinInterpreter.class); + + static final String KYLIN_QUERY_API_URL = "kylin.api.url"; + static final String KYLIN_USERNAME = "kylin.api.user"; + static final String KYLIN_PASSWORD = "kylin.api.password"; + static final String KYLIN_QUERY_PROJECT = "kylin.query.project"; + static final String KYLIN_QUERY_OFFSET = "kylin.query.offset"; + static final String KYLIN_QUERY_LIMIT = "kylin.query.limit"; + static final String KYLIN_QUERY_ACCEPT_PARTIAL = "kylin.query.ispartial"; + static final Pattern KYLIN_TABLE_FORMAT_REGEX_LABEL = Pattern.compile("\"label\":\"(.*?)\""); + static final Pattern KYLIN_TABLE_FORMAT_REGEX = Pattern.compile("\"results\":\\[\\[\"(.*?)\"]]"); + + static { + Interpreter.register( + "kylin", + "kylin", + KylinInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(KYLIN_USERNAME, "ADMIN", "username for kylin user") + .add(KYLIN_PASSWORD, "KYLIN", "password for kylin user") + .add(KYLIN_QUERY_API_URL, "http://<host>:<port>/kylin/api/query", "Kylin API.") + .add(KYLIN_QUERY_PROJECT, "default", "kylin project name") + .add(KYLIN_QUERY_OFFSET, "0", "kylin query offset") + .add(KYLIN_QUERY_LIMIT, "5000", "kylin query limit") + .add(KYLIN_QUERY_ACCEPT_PARTIAL, "true", "The kylin query partial flag").build()); + } + + + + + public KylinInterpreter(Properties property) { + super(property); + } + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + try { + return executeQuery(st); + } catch (IOException e) { + logger.error("failed to query data in kylin ", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return null; + } + + public HttpResponse prepareRequest(String sql) throws IOException { + String KYLIN_PROJECT = getProperty(KYLIN_QUERY_PROJECT); + logger.info("project:" + KYLIN_PROJECT); + logger.info("sql:" + sql); + logger.info("acceptPartial:" + getProperty(KYLIN_QUERY_ACCEPT_PARTIAL)); + logger.info("limit:" + getProperty(KYLIN_QUERY_LIMIT)); + logger.info("offset:" + getProperty(KYLIN_QUERY_OFFSET)); + byte[] encodeBytes = Base64.encodeBase64(new String(getProperty(KYLIN_USERNAME) + + ":" + getProperty(KYLIN_PASSWORD)).getBytes("UTF-8")); + + String postContent = new String("{\"project\":" + "\"" + KYLIN_PROJECT + "\"" + + "," + "\"sql\":" + "\"" + sql + "\"" + + "," + "\"acceptPartial\":" + "\"" + getProperty(KYLIN_QUERY_ACCEPT_PARTIAL) + "\"" + + "," + "\"offset\":" + "\"" + getProperty(KYLIN_QUERY_OFFSET) + "\"" + + "," + "\"limit\":" + "\"" + getProperty(KYLIN_QUERY_LIMIT) + "\"" + "}"); + logger.info("post:" + postContent); + postContent = postContent.replaceAll("[\u0000-\u001f]", " "); + StringEntity entity = new StringEntity(postContent, "UTF-8"); + entity.setContentType("application/json; charset=UTF-8"); + + logger.info("post url:" + getProperty(KYLIN_QUERY_API_URL)); + + HttpPost postRequest = new HttpPost(getProperty(KYLIN_QUERY_API_URL)); + postRequest.setEntity(entity); + postRequest.addHeader("Authorization", "Basic " + new String(encodeBytes)); + postRequest.addHeader("Accept-Encoding", "UTF-8"); + + HttpClient httpClient = HttpClientBuilder.create().build(); + return httpClient.execute(postRequest); + } + + private InterpreterResult executeQuery(String sql) throws IOException { + + HttpResponse response = prepareRequest(sql); + + if (response.getStatusLine().getStatusCode() != 200) { + logger.error("failed to execute query: " + response.getEntity().getContent().toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, + "Failed : HTTP error code " + response.getStatusLine().getStatusCode()); + } + + BufferedReader br = new BufferedReader( + new InputStreamReader((response.getEntity().getContent()))); + StringBuilder sb = new StringBuilder(); + + String output; + logger.info("Output from Server .... \n"); + while ((output = br.readLine()) != null) { + logger.info(output); + sb.append(output).append('\n'); + } + InterpreterResult rett = new InterpreterResult(InterpreterResult.Code.SUCCESS, + formatResult(sb.toString())); + return rett; + } + + private String formatResult(String msg) { + StringBuilder res = new StringBuilder("%table "); + + Matcher ml = KYLIN_TABLE_FORMAT_REGEX_LABEL.matcher(msg); + while (!ml.hitEnd() && ml.find()) { + res.append(ml.group(1) + " \t"); + } + res.append(" \n"); + + Matcher mr = KYLIN_TABLE_FORMAT_REGEX.matcher(msg); + String table = null; + while (!mr.hitEnd() && mr.find()) { + table = mr.group(1); + } + + String[] row = table.split("\"],\\[\""); + for (int i = 0; i < row.length; i++) { + String[] col = row[i].split("\",\""); + for (int j = 0; j < col.length; j++) { + res.append(col[j] + " \t"); + } + res.append(" \n"); + } + return res.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/kylin/src/test/java/KylinInterpreterTest.java ---------------------------------------------------------------------- diff --git a/kylin/src/test/java/KylinInterpreterTest.java b/kylin/src/test/java/KylinInterpreterTest.java new file mode 100755 index 0000000..1df20f6 --- /dev/null +++ b/kylin/src/test/java/KylinInterpreterTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.http.*; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.message.AbstractHttpMessage; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.kylin.KylinInterpreter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Locale; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + + +public class KylinInterpreterTest { + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void test(){ + KylinInterpreter t = new MockKylinInterpreter(new Properties()); + InterpreterResult result = t.interpret( + "select a.date,sum(b.measure) as measure from kylin_fact_table a " + + "inner join kylin_lookup_table b on a.date=b.date group by a.date", null); + assertEquals(InterpreterResult.Type.TABLE,result.type()); + } +} + +class MockKylinInterpreter extends KylinInterpreter { + + public MockKylinInterpreter(Properties property) { + super(property); + } + + @Override + public HttpResponse prepareRequest(String sql) throws IOException { + MockHttpClient client = new MockHttpClient(); + return client.execute(new HttpPost()); + } + +} + +class MockHttpClient{ + public MockHttpResponse execute(HttpPost post){ + return new MockHttpResponse(); + } +} + +class MockHttpResponse extends AbstractHttpMessage implements HttpResponse{ + + @Override + public StatusLine getStatusLine() { + return new MockStatusLine(); + } + + @Override + public void setStatusLine(StatusLine statusLine) { + + } + + @Override + public void setStatusLine(ProtocolVersion protocolVersion, int i) { + + } + + @Override + public void setStatusLine(ProtocolVersion protocolVersion, int i, String s) { + + } + + @Override + public void setStatusCode(int i) throws IllegalStateException { + + } + + @Override + public void setReasonPhrase(String s) throws IllegalStateException { + + } + + @Override + public HttpEntity getEntity() { + return new MockEntity(); + } + + @Override + public void setEntity(HttpEntity httpEntity) { + + } + + @Override + public Locale getLocale() { + return null; + } + + @Override + public void setLocale(Locale locale) { + + } + + @Override + public ProtocolVersion getProtocolVersion() { + return null; + } +} + +class MockStatusLine implements StatusLine{ + + @Override + public ProtocolVersion getProtocolVersion() { + return null; + } + + @Override + public int getStatusCode() { + return 200; + } + + @Override + public String getReasonPhrase() { + return null; + } +} + +class MockEntity implements HttpEntity{ + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public boolean isChunked() { + return false; + } + + @Override + public long getContentLength() { + return 0; + } + + @Override + public Header getContentType() { + return null; + } + + @Override + public Header getContentEncoding() { + return null; + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + return new ByteArrayInputStream(("{\"columnMetas\":" + + "[{\"label\":\"PART_DT\"},{\"label\":\"measure\"}]," + + "\"results\":[[\"2012-01-03\",\"917.4138\"]," + + "[\"2012-05-06\",\"592.4823\"]]}").getBytes()); + } + + @Override + public void writeTo(OutputStream outputStream) throws IOException { + + } + + @Override + public boolean isStreaming() { + return false; + } + + @Override + public void consumeContent() throws IOException { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index b7eccb0..9b7cd46 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ <module>tajo</module> <module>flink</module> <module>ignite</module> + <module>kylin</module> <module>lens</module> <module>cassandra</module> <module>zeppelin-web</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/db418bbe/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java old mode 100644 new mode 100755 index 363b154..0d71b1a --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -410,7 +410,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.lens.LensInterpreter," + "org.apache.zeppelin.cassandra.CassandraInterpreter," + "org.apache.zeppelin.geode.GeodeOqlInterpreter," - + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter"), + + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + + "org.apache.zeppelin.kylin.KylinInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
