Repository: zeppelin Updated Branches: refs/heads/master 24922e103 -> a4a86862e
[Zeppelin-1022] Apply new mechanism to LivyInterpreter ### What is this PR for? Apply new mechanism to LivyInterpreter ### What type of PR is it? [Improvement] ### Todos * [x] - Apply new mechanism to LivyInterpreter * [x] - rename zeppelin.livy.url to livy.host.url to make all params look livy.* * [x] - surround interpreterContext.getAuthenticationInfo().getUser() with "" ### What is the Jira issue? * [Zeppelin-1022](https://issues.apache.org/jira/browse/ZEPPELIN-1022) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Prabhjyot Singh <[email protected]> Closes #1028 from prabhjyotsingh/ZEPPELIN-1022 and squashes the following commits: f578a34 [Prabhjyot Singh] revert name to start with zeppelin.* bd68e4f [Prabhjyot Singh] Merge remote-tracking branch 'origin/master' into ZEPPELIN-1022 0b23cca [Prabhjyot Singh] surround interpreterContext.getAuthenticationInfo().getUser() with "" e010259 [Prabhjyot Singh] rename zeppelin.livy.url to livy.host.url to make all params look livy.* 57d38eb [Prabhjyot Singh] Apply new mechanism to LivyInterpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a4a86862 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a4a86862 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a4a86862 Branch: refs/heads/master Commit: a4a86862ed207a5d3476d26ca464d30773669482 Parents: 24922e1 Author: Prabhjyot Singh <[email protected]> Authored: Fri Jun 17 14:57:25 2016 +0530 Committer: Prabhjyot Singh <[email protected]> Committed: Fri Jun 17 23:44:17 2016 +0530 ---------------------------------------------------------------------- .../org/apache/zeppelin/livy/LivyHelper.java | 24 ++--- .../zeppelin/livy/LivyPySparkInterpreter.java | 10 -- .../zeppelin/livy/LivySparkInterpreter.java | 29 ------ .../zeppelin/livy/LivySparkRInterpreter.java | 10 -- .../zeppelin/livy/LivySparkSQLInterpreter.java | 16 +--- .../src/main/resources/interpreter-setting.json | 97 ++++++++++++++++++++ 6 files changed, 107 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 7f3517e..8c4ddab 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.livy; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; - import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -39,12 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Properties; /*** @@ -64,11 +59,11 @@ public class LivyHelper { public Integer createSession(InterpreterContext context, String kind) throws Exception { try { Map<String, String> conf = new HashMap<String, String>(); - + Iterator<Entry<Object, Object>> it = property.entrySet().iterator(); while (it.hasNext()) { Entry<Object, Object> pair = it.next(); - if (pair.getKey().toString().startsWith("livy.spark.") && + if (pair.getKey().toString().startsWith("livy.spark.") && !pair.getValue().toString().isEmpty()) conf.put(pair.getKey().toString().substring(5), pair.getValue().toString()); } @@ -76,12 +71,12 @@ public class LivyHelper { String confData = gson.toJson(conf); String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", - "POST", + "POST", "{" + "\"kind\": \"" + kind + "\", " + - "\"conf\": " + confData + ", " + - "\"proxyUser\": " + context.getAuthenticationInfo().getUser() + - "}", + "\"conf\": " + confData + ", " + + "\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + + "\"}", context.getParagraphId() ); @@ -96,9 +91,8 @@ public class LivyHelper { LOGGER.error(String.format("sessionId:%s state is %s", jsonMap.get("id"), jsonMap.get("state"))); Thread.sleep(1000); - json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId, - "GET", null, - context.getParagraphId()); + json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + + sessionId, "GET", null, context.getParagraphId()); jsonMap = (Map<Object, Object>) gson.fromJson(json, new TypeToken<Map<Object, Object>>() { }.getType()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java index 4ca629b..bd342a2 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java @@ -37,16 +37,6 @@ public class LivyPySparkInterpreter extends Interpreter { Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class); - static { - Interpreter.register( - "pyspark", - "livy", - LivyPySparkInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .build() - ); - } - protected Map<String, Integer> userSessionMap; protected LivyHelper livyHelper; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index e377009..6aac56d 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -34,38 +34,9 @@ import java.util.Properties; */ public class LivySparkInterpreter extends Interpreter { - static String DEFAULT_URL = "http://localhost:8998"; - static String LOCAL = "local[*]"; Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class); private LivyOutputStream out; - static { - Interpreter.register( - "spark", - "livy", - LivySparkInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.") - .add("livy.spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") - .add("livy.spark.driver.cores", "", "Driver cores. ex) 1, 2") - .add("livy.spark.driver.memory", "", "Driver memory. ex) 512m, 32g") - .add("livy.spark.executor.instances", "", "Executor instances. ex) 1, 4") - .add("livy.spark.executor.cores", "", "Num cores per executor. ex) 1, 4") - .add("livy.spark.executor.memory", "", - "Executor memory per worker instance. ex) 512m, 32g") - .add("livy.spark.dynamicAllocation.enabled", "", "Use dynamic resource allocation") - .add("livy.spark.dynamicAllocation.cachedExecutorIdleTimeout", "", - "Remove an executor which has cached data blocks") - .add("livy.spark.dynamicAllocation.minExecutors", "", - "Lower bound for the number of executors if dynamic allocation is enabled. ") - .add("livy.spark.dynamicAllocation.initialExecutors", "", - "Initial number of executors to run if dynamic allocation is enabled. ") - .add("livy.spark.dynamicAllocation.maxExecutors", "", - "Upper bound for the number of executors if dynamic allocation is enabled. ") - .build() - ); - } - protected static Map<String, Integer> userSessionMap; private LivyHelper livyHelper; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java index ba929bf..753b378 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java @@ -37,16 +37,6 @@ public class LivySparkRInterpreter extends Interpreter { Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class); - static { - Interpreter.register( - "sparkr", - "livy", - LivySparkRInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .build() - ); - } - protected Map<String, Integer> userSessionMap; private LivyHelper livyHelper; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 3c60204..806d7aa 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -35,20 +35,6 @@ import java.util.Properties; public class LivySparkSQLInterpreter extends Interpreter { Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class); - static String DEFAULT_MAX_RESULT = "1000"; - - static { - Interpreter.register( - "sql", - "livy", - LivySparkSQLInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.livy.spark.maxResult", - DEFAULT_MAX_RESULT, - "Max number of SparkSQL result to display.") - .build() - ); - } protected Map<String, Integer> userSessionMap; private LivyHelper livyHelper; @@ -94,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter { line.replaceAll("\"", "\\\\\"") .replaceAll("\\n", " ") + "\").show(" + - property.get("zeppelin.livy.spark.maxResult") + ")", + property.get("livy.spark.sql.maxResult") + ")", interpreterContext, userSessionMap); if (res.code() == InterpreterResult.Code.SUCCESS) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a4a86862/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..c22e9a7 --- /dev/null +++ b/livy/src/main/resources/interpreter-setting.json @@ -0,0 +1,97 @@ +[ + { + "group": "livy", + "name": "spark", + "className": "org.apache.zeppelin.livy.LivySparkInterpreter", + "properties": { + "zeppelin.livy.url": { + "envName": "ZEPPELIN_LIVY_HOST_URL", + "propertyName": "zeppelin.livy.url", + "defaultValue": "http://localhost:8998", + "description": "The URL for Livy Server." + }, + "livy.spark.master": { + "propertyName": "livy.spark.master", + "defaultValue": "local[*]", + "description": "Spark master uri. ex) spark://masterhost:7077" + }, + "livy.spark.driver.cores": { + "propertyName": "livy.spark.driver.cores", + "defaultValue": "", + "description": "Driver cores. ex) 1, 2" + }, + "livy.spark.driver.memory": { + "propertyName": "livy.spark.driver.memory", + "defaultValue": "", + "description": "Driver memory. ex) 512m, 32g" + }, + "livy.spark.executor.instances": { + "propertyName": "livy.spark.executor.instances", + "defaultValue": "", + "description": "Executor instances. ex) 1, 4" + }, + "livy.spark.executor.cores": { + "propertyName": "livy.spark.executor.cores", + "defaultValue": "", + "description": "Num cores per executor. ex) 1, 4" + }, + "livy.spark.executor.memory": { + "propertyName": "livy.spark.executor.memory", + "defaultValue": "", + "description": "Executor memory per worker instance. ex) 512m, 32g" + }, + "livy.spark.dynamicAllocation.enabled": { + "propertyName": "livy.spark.dynamicAllocation.enabled", + "defaultValue": "", + "description": "Use dynamic resource allocation" + }, + "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout": { + "propertyName": "livy.spark.dynamicAllocation.cachedExecutorIdleTimeout", + "defaultValue": "", + "description": "Remove an executor which has cached data blocks" + }, + "livy.spark.dynamicAllocation.minExecutors": { + "propertyName": "livy.spark.dynamicAllocation.minExecutors", + "defaultValue": "", + "description": "Lower bound for the number of executors if dynamic allocation is enabled." + }, + "livy.spark.dynamicAllocation.initialExecutors": { + "propertyName": "livy.spark.dynamicAllocation.initialExecutors", + "defaultValue": "", + "description": "Initial number of executors to run if dynamic allocation is enabled." + }, + "livy.spark.dynamicAllocation.maxExecutors": { + "propertyName": "livy.spark.dynamicAllocation.maxExecutors", + "defaultValue": "", + "description": "Upper bound for the number of executors if dynamic allocation is enabled." + } + } + }, + { + "group": "livy", + "name": "sql", + "className": "org.apache.zeppelin.livy.LivySparkSQLInterpreter", + "properties": { + "zeppelin.livy.spark.sql.maxResult": { + "envName": "ZEPPELIN_LIVY_MAXRESULT", + "propertyName": "zeppelin.livy.spark.sql.maxResult", + "defaultValue": "1000", + "description": "Max number of SparkSQL result to display." + } + } + }, + { + "group": "livy", + "name": "pyspark", + "className": "org.apache.zeppelin.livy.LivyPySparkInterpreter", + "properties": { + } + }, + { + "group": "livy", + "name": "sparkr", + "className": "org.apache.zeppelin.livy.LivySparkRInterpreter", + "properties": { + } + } +] \ No newline at end of file
