Repository: zeppelin
Updated Branches:
  refs/heads/master e575e812f -> d20dbce30


[Zeppelin-1090][Hot Fix] LivySparkSQLInterpreter doesn't work in FIFO.

### What is this PR for?
LivySparkSQLInterpreter should work in FIFO with LivySparkInterpreter just like 
SparkSqlInterpreter works with SparkInterpreter

### What type of PR is it?
[Hot Fix]

### Todos
* [x] - LivySparkSQLInterpreter should work in FIFO just like 
SparkSqlInterpreter
* [x] - add in property file zeppelin.livy.concurrentSQL

### What is the Jira issue?
* [Zeppelin-1090](https://issues.apache.org/jira/browse/ZEPPELIN-1090)

### How should this be tested?
In a notebook create 2 paragraph make content of first as

```
%livy
Thread.sleep(10000)
```

and other as

```
%livy.sql
show tables
```

The second paragraph should not get executed before first.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? n/a
* Is there breaking changes for older versions? n/a
* Does this needs documentation? n/a

Author: Prabhjyot Singh <[email protected]>

Closes #1109 from prabhjyotsingh/ZEPPELIN-1090 and squashes the following 
commits:

a2f863e [Prabhjyot Singh] check for both dead and error cases
fe2ec72 [Prabhjyot Singh] LivySparkSQLInterpreter should work in FIFO just like 
SparkSqlInterpreter
2f1c64c [Prabhjyot Singh] add in property file zeppelin.livy.concurrentSQL


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d20dbce3
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d20dbce3
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d20dbce3

Branch: refs/heads/master
Commit: d20dbce30b3f016be0dc08b8c40eda8990ba1526
Parents: e575e81
Author: Prabhjyot Singh <[email protected]>
Authored: Thu Jun 30 15:41:01 2016 +0530
Committer: Prabhjyot Singh <[email protected]>
Committed: Sat Jul 2 08:49:51 2016 +0530

----------------------------------------------------------------------
 .../org/apache/zeppelin/livy/LivyHelper.java    |  4 ++--
 .../zeppelin/livy/LivySparkSQLInterpreter.java  | 21 +++++++++++++++++---
 .../src/main/resources/interpreter-setting.json |  5 +++++
 3 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d20dbce3/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
index 2c66fa9..ec77f1a 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
@@ -96,7 +96,7 @@ public class LivyHelper {
               }.getType());
           if (jsonMap.get("state").equals("idle")) {
             break;
-          } else if (jsonMap.get("state").equals("error")) {
+          } else if (jsonMap.get("state").equals("error") || 
jsonMap.get("state").equals("dead")) {
             json = executeHTTP(property.getProperty("zeppelin.livy.url") + 
"/sessions/" +
                     sessionId + "/log",
                 "GET", null,
@@ -124,7 +124,7 @@ public class LivyHelper {
 
   protected void initializeSpark(final InterpreterContext context,
                                  final Map<String, Integer> userSessionMap) 
throws Exception {
-    interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
+    interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" +
         "import sqlContext.implicits._", context, userSessionMap);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d20dbce3/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git 
a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java 
b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 806d7aa..22773df 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -80,7 +80,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
               line.replaceAll("\"", "\\\\\"")
                   .replaceAll("\\n", " ")
               + "\").show(" +
-              property.get("livy.spark.sql.maxResult") + ")",
+              property.get("zeppelin.livy.spark.sql.maxResult") + ")",
           interpreterContext, userSessionMap);
 
       if (res.code() == InterpreterResult.Code.SUCCESS) {
@@ -123,6 +123,10 @@ public class LivySparkSQLInterpreter extends Interpreter {
     }
   }
 
+  public boolean concurrentSQL() {
+    return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
+  }
+
   @Override
   public void cancel(InterpreterContext context) {
     livyHelper.cancelHTTP(context.getParagraphId());
@@ -140,8 +144,19 @@ public class LivySparkSQLInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-        LivySparkInterpreter.class.getName() + this.hashCode());
+    if (concurrentSQL()) {
+      int maxConcurrency = 10;
+      return SchedulerFactory.singleton().createOrGetParallelScheduler(
+          LivySparkInterpreter.class.getName() + this.hashCode(), 
maxConcurrency);
+    } else {
+      Interpreter intp =
+          
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
+      if (intp != null) {
+        return intp.getScheduler();
+      } else {
+        return null;
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d20dbce3/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json 
b/livy/src/main/resources/interpreter-setting.json
index 232bcb6..468e9d9 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -88,6 +88,11 @@
         "propertyName": "zeppelin.livy.spark.sql.maxResult",
         "defaultValue": "1000",
         "description": "Max number of SparkSQL result to display."
+      },
+      "zeppelin.livy.concurrentSQL": {
+        "propertyName": "zeppelin.livy.concurrentSQL",
+        "defaultValue": "false",
+        "description": "Execute multiple SQL concurrently if set true."
       }
     }
   },

Reply via email to