Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 7c63bd6f2 -> 7db07f227


merge master into branch-0.6


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/7db07f22
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/7db07f22
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/7db07f22

Branch: refs/heads/branch-0.6
Commit: 7db07f22742896c6de0691cacbd37dbe5b0b493e
Parents: 7c63bd6
Author: Prabhjyot Singh <[email protected]>
Authored: Thu Jun 30 15:41:01 2016 +0530
Committer: Prabhjyot Singh <[email protected]>
Committed: Sat Jul 2 08:52:11 2016 +0530

----------------------------------------------------------------------
 .../org/apache/zeppelin/livy/LivyHelper.java    |  4 +--
 .../zeppelin/livy/LivySparkSQLInterpreter.java  | 35 ++++++++++----------
 .../src/main/resources/interpreter-setting.json |  5 +++
 3 files changed, 25 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
index 2c66fa9..ec77f1a 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
@@ -96,7 +96,7 @@ public class LivyHelper {
               }.getType());
           if (jsonMap.get("state").equals("idle")) {
             break;
-          } else if (jsonMap.get("state").equals("error")) {
+          } else if (jsonMap.get("state").equals("error") || 
jsonMap.get("state").equals("dead")) {
             json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions/" +
                     sessionId + "/log",
                 "GET", null,
@@ -124,7 +124,7 @@ public class LivyHelper {
 
   protected void initializeSpark(final InterpreterContext context,
                                  final Map<String, Integer> userSessionMap) 
throws Exception {
-    interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
+    interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" +
         "import sqlContext.implicits._", context, userSessionMap);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 3c60204..22773df 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -35,20 +35,6 @@ import java.util.Properties;
 public class LivySparkSQLInterpreter extends Interpreter {
 
   Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
-  static String DEFAULT_MAX_RESULT = "1000";
-
-  static {
-    Interpreter.register(
-        "sql",
-        "livy",
-        LivySparkSQLInterpreter.class.getName(),
-        new InterpreterPropertyBuilder()
-            .add("zeppelin.livy.spark.maxResult",
-                DEFAULT_MAX_RESULT,
-                "Max number of SparkSQL result to display.")
-            .build()
-    );
-  }
 
   protected Map<String, Integer> userSessionMap;
   private LivyHelper livyHelper;
@@ -94,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
               line.replaceAll("\"", "\\\\\"")
                   .replaceAll("\\n", " ")
               + "\").show(" +
-              property.get("zeppelin.livy.spark.maxResult") + ")",
+              property.get("zeppelin.livy.spark.sql.maxResult") + ")",
           interpreterContext, userSessionMap);
 
       if (res.code() == InterpreterResult.Code.SUCCESS) {
@@ -137,6 +123,10 @@ public class LivySparkSQLInterpreter extends Interpreter {
     }
   }
 
+  public boolean concurrentSQL() {
+    return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
+  }
+
   @Override
   public void cancel(InterpreterContext context) {
     livyHelper.cancelHTTP(context.getParagraphId());
@@ -154,8 +144,19 @@ public class LivySparkSQLInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-        LivySparkInterpreter.class.getName() + this.hashCode());
+    if (concurrentSQL()) {
+      int maxConcurrency = 10;
+      return SchedulerFactory.singleton().createOrGetParallelScheduler(
+          LivySparkInterpreter.class.getName() + this.hashCode(), 
maxConcurrency);
+    } else {
+      Interpreter intp =
+          
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
+      if (intp != null) {
+        return intp.getScheduler();
+      } else {
+        return null;
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7db07f22/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json 
b/livy/src/main/resources/interpreter-setting.json
index 7ae435f..e90216c 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -87,6 +87,11 @@
         "propertyName": "zeppelin.livy.spark.sql.maxResult",
         "defaultValue": "1000",
         "description": "Max number of SparkSQL result to display."
+      },
+      "zeppelin.livy.concurrentSQL": {
+        "propertyName": "zeppelin.livy.concurrentSQL",
+        "defaultValue": "false",
+        "description": "Execute multiple SQL concurrently if set true."
       }
     }
   },

Reply via email to