Repository: incubator-zeppelin Updated Branches: refs/heads/master c04440d22 -> 1020b7987
[ZEPPELIN-63] Add Apache Ignite interpreter Author: Andrey Gura <[email protected]> Author: agura <[email protected]> Closes #108 from agura/master and squashes the following commits: 63fde14 [Andrey Gura] Update README.md 43c07b0 [agura] [ZEPPELIN-63] Add Apache Ignite interpreter Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/1020b798 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/1020b798 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/1020b798 Branch: refs/heads/master Commit: 1020b79871b5b09b9fdaf5876d377d4e3300edfa Parents: c04440d Author: Andrey Gura <[email protected]> Authored: Thu Jun 18 20:18:17 2015 +0300 Committer: Lee moon soo <[email protected]> Committed: Sat Jun 20 15:04:06 2015 -0700 ---------------------------------------------------------------------- README.md | 4 + conf/zeppelin-site.xml.template | 2 +- ignite/pom.xml | 174 ++++++++++ .../zeppelin/ignite/IgniteInterpreter.java | 343 +++++++++++++++++++ .../zeppelin/ignite/IgniteInterpreterUtils.java | 44 +++ .../zeppelin/ignite/IgniteSqlInterpreter.java | 197 +++++++++++ .../zeppelin/ignite/IgniteInterpreterTest.java | 95 +++++ .../ignite/IgniteSqlInterpreterTest.java | 105 ++++++ .../java/org/apache/zeppelin/ignite/Person.java | 50 +++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 4 +- 11 files changed, 1017 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 4b0475a..fde2b4f 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,10 @@ Yarn (Hadoop 2.2.x and later) ``` mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -Pyarn -DskipTests ``` +Ignite (1.1.0-incubating and later) +``` +mvn clean package -Dignite.version=1.1.0-incubating -DskipTests +``` ### Configure If you wish to configure Zeppelin option (like port number), configure the following files: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index cd72f12..b467a82 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,7 +66,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/pom.xml ---------------------------------------------------------------------- diff --git a/ignite/pom.xml b/ignite/pom.xml new file mode 100644 index 0000000..8c14051 --- /dev/null +++ b/ignite/pom.xml @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>zeppelin-ignite</artifactId> + <packaging>jar</packaging> + <version>0.5.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Apache Ignite interpreter</name> + <url>http://zeppelin.incubator.apache.org</url> + + <properties> + <ignite.version>1.1.0-incubating</ignite.version> + <ignite.scala.binary.version>2.10</ignite.scala.binary.version> + <ignite.scala.version>2.10.4</ignite.scala.version> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-scalar</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${ignite.scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${ignite.scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${ignite.scala.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/ignite</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/ignite</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java ---------------------------------------------------------------------- diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java new file mode 100644 index 0000000..e09f310 --- /dev/null +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.ignite; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintWriter; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.Properties; + +import scala.Console; +import scala.None; +import scala.Some; +import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.IMain; +import scala.tools.nsc.interpreter.Results.Result; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +/** + * Apache Ignite interpreter (http://ignite.incubator.apache.org/). + * + * Use the following properties for interpreter configuration: + * + * <ul> + * <li>{@code ignite.addresses} - coma separated list of hosts in form {@code <host>:<port>} + * or {@code <host>:<port_1>..<port_n>} </li> + * <li>{@code ignite.clientMode} - indicates that Ignite interpreter + * should start node in client mode ({@code true} or {@code false}).</li> + * <li>{@code ignite.peerClassLoadingEnabled} - enables/disables peer class loading + * ({@code true} or {@code false}).</li> + * <li>{@code ignite.config.url} - URL for Ignite configuration. If this URL specified then + * all aforementioned properties will not be taken in account.</li> + * </ul> + */ +public class IgniteInterpreter extends Interpreter { + static final String IGNITE_ADDRESSES = "ignite.addresses"; + + static final String IGNITE_CLIENT_MODE = "ignite.clientMode"; + + static final String IGNITE_PEER_CLASS_LOADING_ENABLED = "ignite.peerClassLoadingEnabled"; + + static final String IGNITE_CFG_URL = "ignite.config.url"; + + static { + Interpreter.register( + "ignite", + "ignite", + IgniteInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(IGNITE_ADDRESSES, "127.0.0.1:47500..47509", + "Coma separated list of addresses " + + "(e.g. 127.0.0.1:47500 or 127.0.0.1:47500..47509)") + .add(IGNITE_CLIENT_MODE, "true", "Client mode. true or false") + .add(IGNITE_CFG_URL, "", "Configuration URL. Overrides all other settings.") + .add(IGNITE_PEER_CLASS_LOADING_ENABLED, "true", + "Peer class loading enabled. true or false") + .build()); + } + + private Logger logger = LoggerFactory.getLogger(IgniteInterpreter.class); + private Ignite ignite; + private ByteArrayOutputStream out; + private IMain imain; + private Throwable initEx; + + public IgniteInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + Settings settings = new Settings(); + + URL[] urls = getClassloaderUrls(); + + // set classpath + PathSetting pathSettings = settings.classpath(); + StringBuilder sb = new StringBuilder(); + + for (File f : currentClassPath()) { + if (sb.length() > 0) { + sb.append(File.pathSeparator); + } + sb.append(f.getAbsolutePath()); + } + + if (urls != null) { + for (URL u : urls) { + if (sb.length() > 0) { + sb.append(File.pathSeparator); + } + sb.append(u.getFile()); + } + } + + pathSettings.v_$eq(sb.toString()); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + + settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread().getContextClassLoader())); + + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + out = new ByteArrayOutputStream(); + imain = new IMain(settings, new PrintWriter(out)); + + initIgnite(); + } + + private List<File> currentClassPath() { + List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + + for (String cp : cps) { + paths.add(new File(cp)); + } + + return paths; + } + + private List<File> classPath(ClassLoader cl) { + List<File> paths = new LinkedList<>(); + + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + + return paths; + } + + public Object getValue(String name) { + Object val = imain.valueOfTerm(name); + + if (val instanceof None) { + return null; + } else if (val instanceof Some) { + return ((Some) val).get(); + } else { + return val; + } + } + + private Ignite getIgnite() { + if (ignite == null) { + try { + String cfgUrl = getProperty(IGNITE_CFG_URL); + + if (cfgUrl != null && !cfgUrl.isEmpty()) { + ignite = Ignition.start(new URL(cfgUrl)); + } else { + IgniteConfiguration conf = new IgniteConfiguration(); + + conf.setClientMode(Boolean.parseBoolean(getProperty(IGNITE_CLIENT_MODE))); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + ipFinder.setAddresses(getAddresses()); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + conf.setDiscoverySpi(discoSpi); + + conf.setPeerClassLoadingEnabled( + Boolean.parseBoolean(getProperty(IGNITE_PEER_CLASS_LOADING_ENABLED))); + + ignite = Ignition.start(conf); + } + + initEx = null; + } catch (Exception e) { + initEx = e; + } + } + return ignite; + } + + private void initIgnite() { + imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + Map<String, Object> binder = (Map<String, Object>) getValue("_binder"); + + if (getIgnite() != null) { + binder.put("ignite", ignite); + + imain.interpret("@transient val ignite = " + + "_binder.get(\"ignite\")" + + ".asInstanceOf[org.apache.ignite.Ignite]"); + } + } + + @Override + public void close() { + initEx = null; + + if (ignite != null) { + ignite.close(); + ignite = null; + } + + if (imain != null) { + imain.close(); + imain = null; + } + } + + private List<String> getAddresses() { + String prop = getProperty(IGNITE_ADDRESSES); + + if (prop == null || prop.isEmpty()) { + return Collections.emptyList(); + } + + String[] tokens = prop.split(","); + List<String> addresses = new ArrayList<>(tokens.length); + Collections.addAll(addresses, tokens); + + return addresses; + } + + @Override + public InterpreterResult interpret(String line, InterpreterContext context) { + if (initEx != null) { + return IgniteInterpreterUtils.buildErrorResult(initEx); + } + + if (line == null || line.trim().length() == 0) { + return new InterpreterResult(Code.SUCCESS); + } + + return interpret(line.split("\n")); + } + + @Override + public void cancel(InterpreterContext context) { + } + + private InterpreterResult interpret(String[] lines) { + String[] linesToRun = new String[lines.length + 1]; + System.arraycopy(lines, 0, linesToRun, 0, lines.length); + linesToRun[lines.length] = "print(\"\")"; + + Console.setOut(out); + out.reset(); + Code code = null; + + String incomplete = ""; + for (String s : linesToRun) { + try { + code = getResultCode(imain.interpret(incomplete + s)); + } catch (Exception e) { + logger.info("Interpreter exception", e); + return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + } + + if (code == Code.ERROR) { + return new InterpreterResult(code, out.toString()); + } else if (code == Code.INCOMPLETE) { + incomplete += s + '\n'; + } else { + incomplete = ""; + } + } + + if (code == Code.INCOMPLETE) { + return new InterpreterResult(code, "Incomplete expression"); + } else { + return new InterpreterResult(code, out.toString()); + } + } + + private Code getResultCode(Result res) { + if (res instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (res instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return new LinkedList<>(); + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + IgniteInterpreter.class.getName() + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java new file mode 100644 index 0000000..74a0a7f --- /dev/null +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.ignite; + +import org.apache.zeppelin.interpreter.InterpreterResult; + +/** + * Apache Ignite interpreter utils. + */ +public class IgniteInterpreterUtils { + /** + * Builds error result from given exception. + * @param e Exception. + * @return result. + */ + public static InterpreterResult buildErrorResult(Throwable e) { + StringBuilder sb = new StringBuilder(e.getMessage()); + + while ((e = e.getCause()) != null) { + String errMsg = e.getMessage(); + + if (errMsg != null) { + sb.append('\n').append(errMsg); + } + } + + return new InterpreterResult(InterpreterResult.Code.ERROR, sb.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java new file mode 100644 index 0000000..5d77e7d --- /dev/null +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.ignite; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.Statement; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +/** + * Apache Ignite SQL interpreter (http://ignite.incubator.apache.org/). + * + * Use {@code ignite.jdbc.url} property to set up JDBC connection URL. + * URL has the following pattern: + * {@code jdbc:ignite://<hostname>:<port>/<cache_name>} + * + * <ul> + * <li>Hostname is required.</li> + * <li>If port is not defined, 11211 is used (default for Ignite client).</li> + * <li>Leave cache_name empty if you are connecting to a default cache. + * Note that the cache name is case sensitive.</li> + * </ul> + */ +public class IgniteSqlInterpreter extends Interpreter { + private static final String IGNITE_JDBC_DRIVER_NAME = "org.apache.ignite.IgniteJdbcDriver"; + + static final String IGNITE_JDBC_URL = "ignite.jdbc.url"; + + static { + Interpreter.register( + "ignitesql", + "ignite", + IgniteSqlInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(IGNITE_JDBC_URL, "jdbc:ignite://localhost:11211/", "Ignite JDBC connection URL.") + .build()); + } + + private Logger logger = LoggerFactory.getLogger(IgniteSqlInterpreter.class); + + private Connection conn; + + private Throwable connEx; + + private Statement curStmt; + + public IgniteSqlInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + try { + Class.forName(IGNITE_JDBC_DRIVER_NAME); + } catch (ClassNotFoundException e) { + logger.error("Can't open connection", e); + connEx = e; + return; + } + + try { + logger.info("connect to " + getProperty(IGNITE_JDBC_URL)); + + conn = DriverManager.getConnection(getProperty(IGNITE_JDBC_URL)); + connEx = null; + + logger.info("Successfully created JDBC connection"); + } catch (SQLException e) { + logger.error("Can't open connection: ", e); + connEx = e; + } + } + + @Override + public void close() { + try { + if (conn != null) { + conn.close(); + } + } catch (SQLException e) { + throw new InterpreterException(e); + } finally { + conn = null; + connEx = null; + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (connEx != null) { + return new InterpreterResult(Code.ERROR, connEx.getMessage()); + } + + StringBuilder msg = new StringBuilder("%table "); + + try (Statement stmt = conn.createStatement()) { + + curStmt = stmt; + + try (ResultSet res = stmt.executeQuery(st)) { + ResultSetMetaData md = res.getMetaData(); + + for (int i = 1; i <= md.getColumnCount(); i++) { + if (i > 1) { + msg.append('\t'); + } + + msg.append(md.getColumnName(i)); + } + + msg.append('\n'); + + while (res.next()) { + for (int i = 1; i <= md.getColumnCount(); i++) { + msg.append(res.getString(i)); + + if (i != md.getColumnCount()) { + msg.append('\t'); + } + } + + msg.append('\n'); + } + } + } catch (Exception e) { + return IgniteInterpreterUtils.buildErrorResult(e); + } finally { + curStmt = null; + } + + return new InterpreterResult(Code.SUCCESS, msg.toString()); + } + + @Override + public void cancel(InterpreterContext context) { + if (curStmt != null) { + try { + curStmt.cancel(); + } catch (SQLException e) { + // No-op. + } finally { + curStmt = null; + } + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + IgniteSqlInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List<String> completion(String buf, int cursor) { + return new LinkedList<>(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java new file mode 100644 index 0000000..3d3f50b --- /dev/null +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.ignite; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for Apache Ignite interpreter ({@link IgniteInterpreter}). + */ +public class IgniteInterpreterTest { + private static final String HOST = "127.0.0.1:47500..47509"; + + private static final InterpreterContext INTP_CONTEXT = + new InterpreterContext(null, null, null, null, null, null, null); + + private IgniteInterpreter intp; + private Ignite ignite; + + @Before + public void setUp() { + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + ipFinder.setAddresses(Collections.singletonList(HOST)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + IgniteConfiguration cfg = new IgniteConfiguration(); + cfg.setDiscoverySpi(discoSpi); + + cfg.setGridName("test"); + + ignite = Ignition.start(cfg); + + Properties props = new Properties(); + props.setProperty(IgniteSqlInterpreter.IGNITE_JDBC_URL, "jdbc:intp://localhost:11211/person"); + props.setProperty(IgniteInterpreter.IGNITE_CLIENT_MODE, "false"); + props.setProperty(IgniteInterpreter.IGNITE_PEER_CLASS_LOADING_ENABLED, "false"); + + intp = new IgniteInterpreter(props); + intp.open(); + } + + @After + public void tearDown() { + ignite.close(); + intp.close(); + } + + @Test + public void testInterpret() { + String sizeVal = "size"; + + InterpreterResult result = intp.interpret("import org.apache.ignite.IgniteCache\n" + + "val " + sizeVal + " = ignite.cluster().nodes().size()", INTP_CONTEXT); + + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains(sizeVal + ": Int = " + ignite.cluster().nodes().size())); + } + + @Test + public void testInterpretInvalidInput() { + InterpreterResult result = intp.interpret("invalid input", INTP_CONTEXT); + + assertEquals(InterpreterResult.Code.ERROR, result.code()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java new file mode 100644 index 0000000..de1e760 --- /dev/null +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.ignite; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for Apache Ignite SQL interpreter ({@link IgniteSqlInterpreter}). + */ +public class IgniteSqlInterpreterTest { + private static final String HOST = "127.0.0.1:47500..47509"; + + private static final InterpreterContext INTP_CONTEXT = + new InterpreterContext(null, null, null, null, null, null, null); + + private Ignite ignite; + private IgniteSqlInterpreter intp; + + @Before + public void setUp() { + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + ipFinder.setAddresses(Collections.singletonList(HOST)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + IgniteConfiguration cfg = new IgniteConfiguration(); + cfg.setDiscoverySpi(discoSpi); + + cfg.setGridName("test"); + + ignite = Ignition.start(cfg); + + Properties props = new Properties(); + props.setProperty(IgniteSqlInterpreter.IGNITE_JDBC_URL, "jdbc:ignite://localhost:11211/person"); + props.setProperty(IgniteInterpreter.IGNITE_CLIENT_MODE, "false"); + + intp = new IgniteSqlInterpreter(props); + + CacheConfiguration<Integer, Person> cacheConf = new CacheConfiguration<>(); + cacheConf.setIndexedTypes(Integer.class, Person.class); + cacheConf.setName("person"); + + IgniteCache<Integer, Person> cache = ignite.createCache(cacheConf); + cache.put(1, new Person("sun", 100)); + cache.put(2, new Person("moon", 50)); + assertEquals("moon", cache.get(2).getName()); + + intp.open(); + } + + @After + public void tearDown() { + intp.close(); + ignite.close(); + } + + @Test + public void testSql() { + InterpreterResult result = intp.interpret("select name, age from person where age > 10", INTP_CONTEXT); + + assertEquals(Code.SUCCESS, result.code()); + assertEquals(Type.TABLE, result.type()); + assertEquals("NAME\tAGE\nsun\t100\nmoon\t50\n", result.message()); + } + + @Test + public void testInvalidSql() throws Exception { + InterpreterResult result = intp.interpret("select * hrom person", INTP_CONTEXT); + + assertEquals(Code.ERROR, result.code()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java b/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java new file mode 100644 index 0000000..e0b4367 --- /dev/null +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.ignite; + +import java.io.Serializable; + +import org.apache.ignite.cache.query.annotations.QuerySqlField; + +public class Person implements Serializable { + @QuerySqlField + private String name; + + @QuerySqlField + private int age; + + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e3a7ca0..ecd819f 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ <module>hive</module> <module>tajo</module> <module>flink</module> + <module>ignite</module> <module>zeppelin-web</module> <module>zeppelin-server</module> <module>zeppelin-distribution</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 95d77b7..6bc8a6c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -397,7 +397,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.shell.ShellInterpreter," + "org.apache.zeppelin.hive.HiveInterpreter," + "org.apache.zeppelin.tajo.TajoInterpreter," - + "org.apache.zeppelin.flink.FlinkInterpreter"), + + "org.apache.zeppelin.flink.FlinkInterpreter," + + "org.apache.zeppelin.ignite.IgniteInterpreter," + + "org.apache.zeppelin.ignite.IgniteSqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
