Repository: zeppelin Updated Branches: refs/heads/master d9abff613 -> 1ac0e2f00
ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter ### What is this PR for? Allow user to specify the pool when running spark sql in concurrent approach. e.g. ``` %spark.sql(pool=pool_1) sql statement ``` ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3563 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #3044 from zjffdu/ZEPPELIN-3563 and squashes the following commits: 255f5baf9 [Jeff Zhang] ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1ac0e2f0 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1ac0e2f0 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1ac0e2f0 Branch: refs/heads/master Commit: 1ac0e2f0086b7da5b7149562f2db8647c3b711c9 Parents: d9abff6 Author: Jeff Zhang <[email protected]> Authored: Wed Jun 27 15:52:12 2018 +0800 Committer: Jeff Zhang <[email protected]> Committed: Thu Jul 12 09:54:49 2018 +0800 ---------------------------------------------------------------------- docs/interpreter/spark.md | 20 ++++++++ .../zeppelin/spark/IPySparkInterpreter.java | 11 ++++- .../zeppelin/spark/NewSparkInterpreter.java | 13 +++++- .../zeppelin/spark/OldSparkInterpreter.java | 3 +- .../zeppelin/spark/PySparkInterpreter.java | 9 +++- .../zeppelin/spark/SparkRInterpreter.java | 13 ++++-- .../zeppelin/spark/SparkSqlInterpreter.java | 28 +++-------- .../org/apache/zeppelin/spark/SparkVersion.java | 5 ++ .../zeppelin/spark/SparkZeppelinContext.java | 3 +- .../java/org/apache/zeppelin/spark/Utils.java | 4 ++ .../src/main/resources/interpreter-setting.json | 7 +++ .../zeppelin/spark/NewSparkInterpreterTest.java | 27 +++++++++++ .../spark/NewSparkSqlInterpreterTest.java | 49 +++++++++++++++++++- .../src/test/resources/fairscheduler.xml | 13 ++++++ 14 files changed, 170 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/docs/interpreter/spark.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index 6775fbf..34f5bb6 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -121,6 +121,11 @@ You can also set other Spark properties which are not listed in the table. For a <td>Execute multiple SQL concurrently if set true.</td> </tr> <tr> + <td>zeppelin.spark.concurrentSQL.max</td> + <td>10</td> + <td>Max number of SQL concurrently executed</td> + </tr> + <tr> <td>zeppelin.spark.maxResult</td> <td>1000</td> <td>Max number of Spark SQL result to display.</td> @@ -332,6 +337,21 @@ utilizing Zeppelin's built-in [Angular Display System](../usage/display_system/a <img class="img-responsive" src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif" /> +## Running spark sql concurrently +By default, each sql statement would run sequentially in `%spark.sql`. But you can run them concurrently by following setup. + +1. set `zeppelin.spark.concurrentSQL` to true to enable the sql concurrent feature, underneath zeppelin will change to use fairscheduler for spark. And also set `zeppelin.spark.concurrentSQL.max` to control the max number of sql statements running concurrently. +2. configure pools by creating `fairscheduler.xml` under your `SPARK_CONF_DIR`, check the offical spark doc [Configuring Pool Properties](http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties) +3. set pool property via setting paragraph property. e.g. + +``` +%spark(pool=pool1) + +sql statement +``` + +This feature is available for both all versions of scala spark, pyspark. For sparkr, it is only available starting from 2.3.0. + ## Interpreter setting option You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index b6eb014..1cc88b8 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -118,12 +118,21 @@ public class IPySparkInterpreter extends IPythonInterpreter { public InterpreterResult interpret(String st, InterpreterContext context) { InterpreterContext.set(context); String jobGroupId = Utils.buildJobGroupId(context); - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + String jobDesc = Utils.buildJobDesc(context); String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')"; InterpreterResult result = super.interpret(setJobGroupStmt, context); if (result.code().equals(InterpreterResult.Code.ERROR)) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup"); } + String pool = "None"; + if (context.getLocalProperties().containsKey("pool")) { + pool = "'" + context.getLocalProperties().get("pool") + "'"; + } + String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; + result = super.interpret(setPoolStmt, context); + if (result.code().equals(InterpreterResult.Code.ERROR)) { + return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool"); + } return super.interpret(st, context); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java index 1bde23f..17a257c 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java @@ -86,9 +86,15 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { if (!StringUtils.isBlank(entry.getValue().toString())) { conf.set(entry.getKey().toString(), entry.getValue().toString()); } + // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are legacy zeppelin + // properties, convert them to spark properties here. if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) { conf.set("spark.useHiveContext", entry.getValue().toString()); } + if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL") + && entry.getValue().toString().equals("true")) { + conf.set("spark.scheduler.mode", "FAIR"); + } } // use local mode for embedded spark mode when spark.master is not found conf.setIfMissing("spark.master", "local"); @@ -141,8 +147,11 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { z.setGui(context.getGui()); z.setNoteGui(context.getNoteGui()); z.setInterpreterContext(context); - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); - sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false); + sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); + // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph + // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool + sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool")); + return innerInterpreter.interpret(st, context); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index 4c2ec7c..b9a7868 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -1041,8 +1041,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { synchronized (this) { z.setGui(context.getGui()); z.setNoteGui(context.getNoteGui()); - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); - sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false); + sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 8d5ce70..2093dde 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -150,10 +150,17 @@ public class PySparkInterpreter extends PythonInterpreter { @Override protected void preCallPython(InterpreterContext context) { String jobGroup = Utils.buildJobGroupId(context); - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + String jobDesc = Utils.buildJobDesc(context); callPython(new PythonInterpretRequest( String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc), false, false)); + + String pool = "None"; + if (context.getLocalProperties().containsKey("pool")) { + pool = "'" + context.getLocalProperties().get("pool") + "'"; + } + String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")"; + callPython(new PythonInterpretRequest(setPoolStmt, false, false)); } // Run python shell http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index f1b1253..9bd5445 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -123,8 +123,7 @@ public class SparkRInterpreter extends Interpreter { throws InterpreterException { String jobGroup = Utils.buildJobGroupId(interpreterContext); - String jobDesc = "Started by: " + - Utils.getUserName(interpreterContext.getAuthenticationInfo()); + String jobDesc = Utils.buildJobDesc(interpreterContext); sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); String imageWidth = getProperty("zeppelin.R.image.width", "100%"); @@ -156,7 +155,15 @@ public class SparkRInterpreter extends Interpreter { "\", \"" + jobDesc + "\", TRUE)"; } lines = setJobGroup + "\n" + lines; - + if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) { + // setLocalProperty is only available from spark 2.3.0 + String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)"; + if (interpreterContext.getLocalProperties().containsKey("pool")) { + setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" + + interpreterContext.getLocalProperties().get("pool") + "')"; + } + lines = setPoolStmt + "\n" + lines; + } try { // render output with knitr if (rbackendDead.get()) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 4d2bed1..31e883a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -44,19 +44,13 @@ import org.slf4j.LoggerFactory; public class SparkSqlInterpreter extends Interpreter { private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); - public static final String MAX_RESULTS = "zeppelin.spark.maxResult"; - - AtomicInteger num = new AtomicInteger(0); - - private int maxResult; - public SparkSqlInterpreter(Properties property) { super(property); } @Override public void open() { - this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); + } private SparkInterpreter getSparkInterpreter() throws InterpreterException { @@ -88,25 +82,17 @@ public class SparkSqlInterpreter extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - SQLContext sqlc = null; SparkInterpreter sparkInterpreter = getSparkInterpreter(); - if (sparkInterpreter.isUnsupportedSparkVersion()) { return new InterpreterResult(Code.ERROR, "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } sparkInterpreter.getZeppelinContext().setInterpreterContext(context); - sqlc = sparkInterpreter.getSQLContext(); + SQLContext sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); - if (concurrentSQL()) { - sc.setLocalProperty("spark.scheduler.pool", "fair"); - } else { - sc.setLocalProperty("spark.scheduler.pool", null); - } - - String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); - sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false); + sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool")); + sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); Object rdd = null; try { // method signature of sqlc.sql() is changed @@ -138,9 +124,7 @@ public class SparkSqlInterpreter extends Interpreter { @Override public void cancel(InterpreterContext context) throws InterpreterException { SparkInterpreter sparkInterpreter = getSparkInterpreter(); - SQLContext sqlc = sparkInterpreter.getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - + SparkContext sc = sparkInterpreter.getSparkContext(); sc.cancelJobGroup(Utils.buildJobGroupId(context)); } @@ -159,7 +143,7 @@ public class SparkSqlInterpreter extends Interpreter { @Override public Scheduler getScheduler() { if (concurrentSQL()) { - int maxConcurrency = 10; + int maxConcurrency = Integer.parseInt(getProperty("zeppelin.spark.concurrentSQL", "10")); return SchedulerFactory.singleton().createOrGetParallelScheduler( SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency); } else { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 5e412eb..3284986 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -34,6 +34,7 @@ public class SparkVersion { public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0"); public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0"); + public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0"); public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1"); public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0"); @@ -109,6 +110,10 @@ public class SparkVersion { return this.olderThan(SPARK_1_3_0); } + public boolean isSpark2() { + return this.newerThanEquals(SPARK_2_0_0); + } + public boolean isSecretSocketSupported() { return this.newerThanEquals(SPARK_2_3_1); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java index 492a997..e89607f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java @@ -167,8 +167,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext { if (rows.length > maxResult) { msg.append("\n"); - msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, - SparkSqlInterpreter.MAX_RESULTS)); + msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult")); } sc.clearJobGroup(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index 82bf210..cd6c607 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -152,6 +152,10 @@ class Utils { return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId(); } + public static String buildJobDesc(InterpreterContext context) { + return "Started by: " + getUserName(context.getAuthenticationInfo()); + } + public static String getNoteId(String jobgroupId) { int indexOf = jobgroupId.indexOf("-"); int secondIndex = jobgroupId.indexOf("-", indexOf + 1); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json index 8791ece..5746754 100644 --- a/spark/interpreter/src/main/resources/interpreter-setting.json +++ b/spark/interpreter/src/main/resources/interpreter-setting.json @@ -102,6 +102,13 @@ "description": "Execute multiple SQL concurrently if set true.", "type": "checkbox" }, + "zeppelin.spark.concurrentSQL.max": { + "envName": "ZEPPELIN_SPARK_CONCURRENTSQL_MAX", + "propertyName": "zeppelin.spark.concurrentSQL.max", + "defaultValue": 10, + "description": "Max number of SQL concurrently executed", + "type": "number" + }, "zeppelin.spark.sql.stacktrace": { "envName": "ZEPPELIN_SPARK_SQL_STACKTRACE", "propertyName": "zeppelin.spark.sql.stacktrace", http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index 73bd52c..e9f85fe 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -423,6 +423,33 @@ public class NewSparkInterpreterTest { assertEquals("hello world", output); } + @Test + public void testSchedulePool() throws InterpreterException { + Properties properties = new Properties(); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + properties.setProperty("zeppelin.spark.maxResult", "100"); + properties.setProperty("zeppelin.spark.test", "true"); + properties.setProperty("zeppelin.spark.useNew", "true"); + properties.setProperty("spark.scheduler.mode", "FAIR"); + + interpreter = new SparkInterpreter(properties); + assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter); + interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); + interpreter.open(); + + InterpreterContext context = getInterpreterContext(); + context.getLocalProperties().put("pool", "pool1"); + InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("pool1", interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool")); + + // pool is reset to null if user don't specify it via paragraph properties + result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(null, interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool")); + } + @After public void tearDown() throws InterpreterException { if (this.interpreter != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java index 300388d..525a9a8 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java @@ -48,10 +48,10 @@ public class NewSparkSqlInterpreterTest { @BeforeClass public static void setUp() throws Exception { Properties p = new Properties(); - p.setProperty("spark.master", "local"); + p.setProperty("spark.master", "local[4]"); p.setProperty("spark.app.name", "test"); p.setProperty("zeppelin.spark.maxResult", "10"); - p.setProperty("zeppelin.spark.concurrentSQL", "false"); + p.setProperty("zeppelin.spark.concurrentSQL", "true"); p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false"); p.setProperty("zeppelin.spark.useNew", "true"); intpGroup = new InterpreterGroup(); @@ -179,4 +179,49 @@ public class NewSparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); assertTrue(ret.message().get(1).getData().contains("alert-warning")); } + + @Test + public void testConcurrentSQL() throws InterpreterException, InterruptedException { + if (sparkInterpreter.getSparkVersion().isSpark2()) { + sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); + } else { + sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); + } + + Thread thread1 = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } catch (InterpreterException e) { + e.printStackTrace(); + } + } + }; + + Thread thread2 = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } catch (InterpreterException e) { + e.printStackTrace(); + } + } + }; + + // start running 2 spark sql, each would sleep 10 seconds, the totally running time should + // be less than 20 seconds, which means they run concurrently. + long start = System.currentTimeMillis(); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + long end = System.currentTimeMillis(); + assertTrue("running time must be less than 20 seconds", ((end - start)/1000) < 20); + + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1ac0e2f0/spark/interpreter/src/test/resources/fairscheduler.xml ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/resources/fairscheduler.xml b/spark/interpreter/src/test/resources/fairscheduler.xml new file mode 100644 index 0000000..d163c08 --- /dev/null +++ b/spark/interpreter/src/test/resources/fairscheduler.xml @@ -0,0 +1,13 @@ +<?xml version="1.0"?> +<allocations> + <pool name="pool1"> + <schedulingMode>FAIR</schedulingMode> + <weight>1</weight> + <minShare>2</minShare> + </pool> + <pool name="pool2"> + <schedulingMode>FIFO</schedulingMode> + <weight>2</weight> + <minShare>3</minShare> + </pool> +</allocations> \ No newline at end of file
