Repository: zeppelin
Updated Branches:
  refs/heads/master d9abff613 -> 1ac0e2f00


ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter

### What is this PR for?
Allow user to specify the pool when running spark sql in concurrent approach.
e.g.
```
%spark.sql(pool=pool_1)

sql statement
```

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3563

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes #3044 from zjffdu/ZEPPELIN-3563 and squashes the following commits:

255f5baf9 [Jeff Zhang] ZEPPELIN-3563. Add pool to paragraph property that use 
spark interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1ac0e2f0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1ac0e2f0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1ac0e2f0

Branch: refs/heads/master
Commit: 1ac0e2f0086b7da5b7149562f2db8647c3b711c9
Parents: d9abff6
Author: Jeff Zhang <[email protected]>
Authored: Wed Jun 27 15:52:12 2018 +0800
Committer: Jeff Zhang <[email protected]>
Committed: Thu Jul 12 09:54:49 2018 +0800

----------------------------------------------------------------------
 docs/interpreter/spark.md                       | 20 ++++++++
 .../zeppelin/spark/IPySparkInterpreter.java     | 11 ++++-
 .../zeppelin/spark/NewSparkInterpreter.java     | 13 +++++-
 .../zeppelin/spark/OldSparkInterpreter.java     |  3 +-
 .../zeppelin/spark/PySparkInterpreter.java      |  9 +++-
 .../zeppelin/spark/SparkRInterpreter.java       | 13 ++++--
 .../zeppelin/spark/SparkSqlInterpreter.java     | 28 +++--------
 .../org/apache/zeppelin/spark/SparkVersion.java |  5 ++
 .../zeppelin/spark/SparkZeppelinContext.java    |  3 +-
 .../java/org/apache/zeppelin/spark/Utils.java   |  4 ++
 .../src/main/resources/interpreter-setting.json |  7 +++
 .../zeppelin/spark/NewSparkInterpreterTest.java | 27 +++++++++++
 .../spark/NewSparkSqlInterpreterTest.java       | 49 +++++++++++++++++++-
 .../src/test/resources/fairscheduler.xml        | 13 ++++++
 14 files changed, 170 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/docs/interpreter/spark.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 6775fbf..34f5bb6 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -121,6 +121,11 @@ You can also set other Spark properties which are not 
listed in the table. For a
     <td>Execute multiple SQL concurrently if set true.</td>
   </tr>
   <tr>
+    <td>zeppelin.spark.concurrentSQL.max</td>
+    <td>10</td>
+    <td>Max number of SQL concurrently executed</td>
+  </tr>
+  <tr>
     <td>zeppelin.spark.maxResult</td>
     <td>1000</td>
     <td>Max number of Spark SQL result to display.</td>
@@ -332,6 +337,21 @@ utilizing Zeppelin's built-in [Angular Display 
System](../usage/display_system/a
 
 <img class="img-responsive" 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif"
 />
 
+## Running spark sql concurrently
+By default, each sql statement would run sequentially in `%spark.sql`. But you 
can run them concurrently by following setup.
+
+1. set `zeppelin.spark.concurrentSQL` to true to enable the sql concurrent 
feature, underneath zeppelin will change to use fairscheduler for spark. And 
also set `zeppelin.spark.concurrentSQL.max` to control the max number of sql 
statements running concurrently.
+2. configure pools by creating `fairscheduler.xml` under your 
`SPARK_CONF_DIR`, check the offical spark doc [Configuring Pool 
Properties](http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties)
+3. set pool property via setting paragraph property. e.g.
+
+```
+%spark(pool=pool1)
+
+sql statement
+```
+
+This feature is available for both all versions of scala spark, pyspark. For 
sparkr, it is only available starting from 2.3.0.
+ 
 ## Interpreter setting option
 
 You can choose one of `shared`, `scoped` and `isolated` options wheh you 
configure Spark interpreter.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index b6eb014..1cc88b8 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -118,12 +118,21 @@ public class IPySparkInterpreter extends 
IPythonInterpreter {
   public InterpreterResult interpret(String st, InterpreterContext context) {
     InterpreterContext.set(context);
     String jobGroupId = Utils.buildJobGroupId(context);
-    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(context);
     String setJobGroupStmt = "sc.setJobGroup('" +  jobGroupId + "', '" + 
jobDesc + "')";
     InterpreterResult result = super.interpret(setJobGroupStmt, context);
     if (result.code().equals(InterpreterResult.Code.ERROR)) {
       return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to 
setJobGroup");
     }
+    String pool = "None";
+    if (context.getLocalProperties().containsKey("pool")) {
+      pool = "'" + context.getLocalProperties().get("pool") + "'";
+    }
+    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool 
+ ")";
+    result = super.interpret(setPoolStmt, context);
+    if (result.code().equals(InterpreterResult.Code.ERROR)) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to 
setPool");
+    }
     return super.interpret(st, context);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
index 1bde23f..17a257c 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
@@ -86,9 +86,15 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
         if (!StringUtils.isBlank(entry.getValue().toString())) {
           conf.set(entry.getKey().toString(), entry.getValue().toString());
         }
+        // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are 
legacy zeppelin
+        // properties, convert them to spark properties here.
         if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) 
{
           conf.set("spark.useHiveContext", entry.getValue().toString());
         }
+        if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
+            && entry.getValue().toString().equals("true")) {
+          conf.set("spark.scheduler.mode", "FAIR");
+        }
       }
       // use local mode for embedded spark mode when spark.master is not found
       conf.setIfMissing("spark.master", "local");
@@ -141,8 +147,11 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
     z.setGui(context.getGui());
     z.setNoteGui(context.getNoteGui());
     z.setInterpreterContext(context);
-    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
-    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+    sc.setJobGroup(Utils.buildJobGroupId(context), 
Utils.buildJobDesc(context), false);
+    // set spark.scheduler.pool to null to clear the pool assosiated with this 
paragraph
+    // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
+    sc.setLocalProperty("spark.scheduler.pool", 
context.getLocalProperties().get("pool"));
+
     return innerInterpreter.interpret(st, context);
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index 4c2ec7c..b9a7868 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -1041,8 +1041,7 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     synchronized (this) {
       z.setGui(context.getGui());
       z.setNoteGui(context.getNoteGui());
-      String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
-      sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+      sc.setJobGroup(Utils.buildJobGroupId(context), 
Utils.buildJobDesc(context), false);
       InterpreterResult r = interpretInput(lines, context);
       sc.clearJobGroup();
       return r;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8d5ce70..2093dde 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -150,10 +150,17 @@ public class PySparkInterpreter extends PythonInterpreter 
{
   @Override
   protected void preCallPython(InterpreterContext context) {
     String jobGroup = Utils.buildJobGroupId(context);
-    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(context);
     callPython(new PythonInterpretRequest(
         String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", 
jobGroup, jobDesc),
         false, false));
+
+    String pool = "None";
+    if (context.getLocalProperties().containsKey("pool")) {
+      pool = "'" + context.getLocalProperties().get("pool") + "'";
+    }
+    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool 
+ ")";
+    callPython(new PythonInterpretRequest(setPoolStmt, false, false));
   }
 
   // Run python shell

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index f1b1253..9bd5445 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -123,8 +123,7 @@ public class SparkRInterpreter extends Interpreter {
       throws InterpreterException {
 
     String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    String jobDesc = "Started by: " +
-       Utils.getUserName(interpreterContext.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(interpreterContext);
     sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
 
     String imageWidth = getProperty("zeppelin.R.image.width", "100%");
@@ -156,7 +155,15 @@ public class SparkRInterpreter extends Interpreter {
           "\", \"" + jobDesc + "\", TRUE)";
     }
     lines = setJobGroup + "\n" + lines;
-
+    if 
(sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) {
+      // setLocalProperty is only available from spark 2.3.0
+      String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)";
+      if (interpreterContext.getLocalProperties().containsKey("pool")) {
+        setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" +
+            interpreterContext.getLocalProperties().get("pool") + "')";
+      }
+      lines = setPoolStmt + "\n" + lines;
+    }
     try {
       // render output with knitr
       if (rbackendDead.get()) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 4d2bed1..31e883a 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -44,19 +44,13 @@ import org.slf4j.LoggerFactory;
 public class SparkSqlInterpreter extends Interpreter {
   private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
 
-  public static final String MAX_RESULTS = "zeppelin.spark.maxResult";
-
-  AtomicInteger num = new AtomicInteger(0);
-
-  private int maxResult;
-
   public SparkSqlInterpreter(Properties property) {
     super(property);
   }
 
   @Override
   public void open() {
-    this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
+
   }
 
   private SparkInterpreter getSparkInterpreter() throws InterpreterException {
@@ -88,25 +82,17 @@ public class SparkSqlInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    SQLContext sqlc = null;
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
     if (sparkInterpreter.isUnsupportedSparkVersion()) {
       return new InterpreterResult(Code.ERROR, "Spark "
           + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
     }
 
     sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
-    sqlc = sparkInterpreter.getSQLContext();
+    SQLContext sqlc = sparkInterpreter.getSQLContext();
     SparkContext sc = sqlc.sparkContext();
-    if (concurrentSQL()) {
-      sc.setLocalProperty("spark.scheduler.pool", "fair");
-    } else {
-      sc.setLocalProperty("spark.scheduler.pool", null);
-    }
-
-    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
-    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+    sc.setLocalProperty("spark.scheduler.pool", 
context.getLocalProperties().get("pool"));
+    sc.setJobGroup(Utils.buildJobGroupId(context), 
Utils.buildJobDesc(context), false);
     Object rdd = null;
     try {
       // method signature of sqlc.sql() is changed
@@ -138,9 +124,7 @@ public class SparkSqlInterpreter extends Interpreter {
   @Override
   public void cancel(InterpreterContext context) throws InterpreterException {
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    SQLContext sqlc = sparkInterpreter.getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-
+    SparkContext sc = sparkInterpreter.getSparkContext();
     sc.cancelJobGroup(Utils.buildJobGroupId(context));
   }
 
@@ -159,7 +143,7 @@ public class SparkSqlInterpreter extends Interpreter {
   @Override
   public Scheduler getScheduler() {
     if (concurrentSQL()) {
-      int maxConcurrency = 10;
+      int maxConcurrency = 
Integer.parseInt(getProperty("zeppelin.spark.concurrentSQL", "10"));
       return SchedulerFactory.singleton().createOrGetParallelScheduler(
           SparkSqlInterpreter.class.getName() + this.hashCode(), 
maxConcurrency);
     } else {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 5e412eb..3284986 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@ public class SparkVersion {
   public static final SparkVersion SPARK_1_6_0 = 
SparkVersion.fromVersionString("1.6.0");
 
   public static final SparkVersion SPARK_2_0_0 = 
SparkVersion.fromVersionString("2.0.0");
+  public static final SparkVersion SPARK_2_3_0 = 
SparkVersion.fromVersionString("2.3.0");
   public static final SparkVersion SPARK_2_3_1 = 
SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = 
SparkVersion.fromVersionString("2.4.0");
 
@@ -109,6 +110,10 @@ public class SparkVersion {
     return this.olderThan(SPARK_1_3_0);
   }
 
+  public boolean isSpark2() {
+    return this.newerThanEquals(SPARK_2_0_0);
+  }
+
   public boolean isSecretSocketSupported() {
     return this.newerThanEquals(SPARK_2_3_1);
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
index 492a997..e89607f 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -167,8 +167,7 @@ public class SparkZeppelinContext extends 
BaseZeppelinContext {
 
     if (rows.length > maxResult) {
       msg.append("\n");
-      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
-          SparkSqlInterpreter.MAX_RESULTS));
+      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, 
"zeppelin.spark.maxResult"));
     }
 
     sc.clearJobGroup();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 82bf210..cd6c607 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -152,6 +152,10 @@ class Utils {
     return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
   }
 
+  public static String buildJobDesc(InterpreterContext context) {
+    return "Started by: " + getUserName(context.getAuthenticationInfo());
+  }
+
   public static String getNoteId(String jobgroupId) {
     int indexOf = jobgroupId.indexOf("-");
     int secondIndex = jobgroupId.indexOf("-", indexOf + 1);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json 
b/spark/interpreter/src/main/resources/interpreter-setting.json
index 8791ece..5746754 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -102,6 +102,13 @@
         "description": "Execute multiple SQL concurrently if set true.",
         "type": "checkbox"
       },
+      "zeppelin.spark.concurrentSQL.max": {
+        "envName": "ZEPPELIN_SPARK_CONCURRENTSQL_MAX",
+        "propertyName": "zeppelin.spark.concurrentSQL.max",
+        "defaultValue": 10,
+        "description": "Max number of SQL concurrently executed",
+        "type": "number"
+      },
       "zeppelin.spark.sql.stacktrace": {
         "envName": "ZEPPELIN_SPARK_SQL_STACKTRACE",
         "propertyName": "zeppelin.spark.sql.stacktrace",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 73bd52c..e9f85fe 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -423,6 +423,33 @@ public class NewSparkInterpreterTest {
     assertEquals("hello world", output);
   }
 
+  @Test
+  public void testSchedulePool() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.useNew", "true");
+    properties.setProperty("spark.scheduler.mode", "FAIR");
+
+    interpreter = new SparkInterpreter(properties);
+    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
+    interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+    interpreter.open();
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("pool", "pool1");
+    InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("pool1", 
interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+
+    // pool is reset to null if user don't specify it via paragraph properties
+    result = interpreter.interpret("sc.range(1, 10).sum", 
getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(null, 
interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+  }
+
   @After
   public void tearDown() throws InterpreterException {
     if (this.interpreter != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 300388d..525a9a8 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -48,10 +48,10 @@ public class NewSparkSqlInterpreterTest {
   @BeforeClass
   public static void setUp() throws Exception {
     Properties p = new Properties();
-    p.setProperty("spark.master", "local");
+    p.setProperty("spark.master", "local[4]");
     p.setProperty("spark.app.name", "test");
     p.setProperty("zeppelin.spark.maxResult", "10");
-    p.setProperty("zeppelin.spark.concurrentSQL", "false");
+    p.setProperty("zeppelin.spark.concurrentSQL", "true");
     p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
     p.setProperty("zeppelin.spark.useNew", "true");
     intpGroup = new InterpreterGroup();
@@ -179,4 +179,49 @@ public class NewSparkSqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
     assertTrue(ret.message().get(1).getData().contains("alert-warning"));
   }
+
+  @Test
+  public void testConcurrentSQL() throws InterpreterException, 
InterruptedException {
+    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+      sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => 
{Thread.sleep(e*1000); e})", context);
+    } else {
+      sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) 
=> {Thread.sleep(e*1000); e})", context);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = sqlInterpreter.interpret("select 
sleep(10)", context);
+          assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        } catch (InterpreterException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = sqlInterpreter.interpret("select 
sleep(10)", context);
+          assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        } catch (InterpreterException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    // start running 2 spark sql, each would sleep 10 seconds, the totally 
running time should
+    // be less than 20 seconds, which means they run concurrently.
+    long start = System.currentTimeMillis();
+    thread1.start();
+    thread2.start();
+    thread1.join();
+    thread2.join();
+    long end = System.currentTimeMillis();
+    assertTrue("running time must be less than 20 seconds", ((end - 
start)/1000) < 20);
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/resources/fairscheduler.xml
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/resources/fairscheduler.xml 
b/spark/interpreter/src/test/resources/fairscheduler.xml
new file mode 100644
index 0000000..d163c08
--- /dev/null
+++ b/spark/interpreter/src/test/resources/fairscheduler.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<allocations>
+    <pool name="pool1">
+        <schedulingMode>FAIR</schedulingMode>
+        <weight>1</weight>
+        <minShare>2</minShare>
+    </pool>
+    <pool name="pool2">
+        <schedulingMode>FIFO</schedulingMode>
+        <weight>2</weight>
+        <minShare>3</minShare>
+    </pool>
+</allocations>
\ No newline at end of file

Reply via email to