Repository: zeppelin Updated Branches: refs/heads/branch-0.6 7c63bd6f2 -> 7db07f227
merge master into branch-0.6 Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/7db07f22 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/7db07f22 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/7db07f22 Branch: refs/heads/branch-0.6 Commit: 7db07f22742896c6de0691cacbd37dbe5b0b493e Parents: 7c63bd6 Author: Prabhjyot Singh <[email protected]> Authored: Thu Jun 30 15:41:01 2016 +0530 Committer: Prabhjyot Singh <[email protected]> Committed: Sat Jul 2 08:52:11 2016 +0530 ---------------------------------------------------------------------- .../org/apache/zeppelin/livy/LivyHelper.java | 4 +-- .../zeppelin/livy/LivySparkSQLInterpreter.java | 35 ++++++++++---------- .../src/main/resources/interpreter-setting.json | 5 +++ 3 files changed, 25 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 2c66fa9..ec77f1a 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -96,7 +96,7 @@ public class LivyHelper { }.getType()); if (jsonMap.get("state").equals("idle")) { break; - } else if (jsonMap.get("state").equals("error")) { + } else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) { json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId + "/log", "GET", null, @@ -124,7 +124,7 @@ public class LivyHelper { protected void initializeSpark(final InterpreterContext context, final Map<String, Integer> userSessionMap) throws Exception { - interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" + + interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" + "import sqlContext.implicits._", context, userSessionMap); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 3c60204..22773df 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -35,20 +35,6 @@ import java.util.Properties; public class LivySparkSQLInterpreter extends Interpreter { Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class); - static String DEFAULT_MAX_RESULT = "1000"; - - static { - Interpreter.register( - "sql", - "livy", - LivySparkSQLInterpreter.class.getName(), - new InterpreterPropertyBuilder() - .add("zeppelin.livy.spark.maxResult", - DEFAULT_MAX_RESULT, - "Max number of SparkSQL result to display.") - .build() - ); - } protected Map<String, Integer> userSessionMap; private LivyHelper livyHelper; @@ -94,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter { line.replaceAll("\"", "\\\\\"") .replaceAll("\\n", " ") + "\").show(" + - property.get("zeppelin.livy.spark.maxResult") + ")", + property.get("zeppelin.livy.spark.sql.maxResult") + ")", interpreterContext, userSessionMap); if (res.code() == InterpreterResult.Code.SUCCESS) { @@ -137,6 +123,10 @@ public class LivySparkSQLInterpreter extends Interpreter { } } + public boolean concurrentSQL() { + return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL")); + } + @Override public void cancel(InterpreterContext context) { livyHelper.cancelHTTP(context.getParagraphId()); @@ -154,8 +144,19 @@ public class LivySparkSQLInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - LivySparkInterpreter.class.getName() + this.hashCode()); + if (concurrentSQL()) { + int maxConcurrency = 10; + return SchedulerFactory.singleton().createOrGetParallelScheduler( + LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency); + } else { + Interpreter intp = + getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName()); + if (intp != null) { + return intp.getScheduler(); + } else { + return null; + } + } } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 7ae435f..e90216c 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -87,6 +87,11 @@ "propertyName": "zeppelin.livy.spark.sql.maxResult", "defaultValue": "1000", "description": "Max number of SparkSQL result to display." + }, + "zeppelin.livy.concurrentSQL": { + "propertyName": "zeppelin.livy.concurrentSQL", + "defaultValue": "false", + "description": "Execute multiple SQL concurrently if set true." } } },
