Repository: zeppelin Updated Branches: refs/heads/master c82dd4ec6 -> 5a4aacef2
ZEPPELIN-995 Change scheduler for JDBC interpreter to use concurrent execution ### What is this PR for? Changed scheduler from FIFO to Parallels in JdbcInterpreter. This is a default behaviour of HiveInterpreter. When we merge all JDBC-like interpreter into JDBC, we need to change default behaviour of JdbcInterpreter. ### What type of PR is it? [Feature] ### Todos * [x] - Changed scheduler ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-995 ### How should this be tested? You can run multiple queries simultaneously. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee <[email protected]> Closes #1005 from jongyoul/ZEPPELIN-995 and squashes the following commits: af360fa [Jongyoul Lee] Added option to choose which scheduler we use 3bda988 [Jongyoul Lee] Changed scheduler from FIFO to Parallels in JdbcInterpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5a4aacef Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5a4aacef Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5a4aacef Branch: refs/heads/master Commit: 5a4aacef25b0b54d151cfc7a3ea81cc312f6f655 Parents: c82dd4e Author: Jongyoul Lee <[email protected]> Authored: Wed Jun 15 21:00:16 2016 +0900 Committer: Jongyoul Lee <[email protected]> Committed: Mon Jun 20 13:41:20 2016 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 21 +++++++++++++-- .../src/main/resources/interpreter-setting.json | 12 +++++++++ .../zeppelin/jdbc/JDBCInterpreterTest.java | 28 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5a4aacef/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 5500ee0..e9cf9f8 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -99,6 +99,9 @@ public class JDBCInterpreter extends Interpreter { static final String EMPTY_COLUMN_VALUE = ""; + private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use"; + private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection"; + private final HashMap<String, Properties> propertiesMap; private final Map<String, Statement> paragraphIdStatementMap; @@ -434,8 +437,10 @@ public class JDBCInterpreter extends Interpreter { @Override public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - JDBCInterpreter.class.getName() + this.hashCode()); + String schedulerName = JDBCInterpreter.class.getName() + this.hashCode(); + return isConcurrentExecution() ? + SchedulerFactory.singleton().createOrGetParallelScheduler(schedulerName, 10) + : SchedulerFactory.singleton().createOrGetFIFOScheduler(schedulerName); } @Override @@ -454,5 +459,17 @@ public class JDBCInterpreter extends Interpreter { return Integer.valueOf( propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT)); } + + boolean isConcurrentExecution() { + return Boolean.valueOf(getProperty(CONCURRENT_EXECUTION_KEY)); + } + + int getMaxConcurrentConnection() { + try { + return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT)); + } catch (Exception e) { + return 10; + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5a4aacef/jdbc/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index 97b2c61..069880c 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -33,6 +33,18 @@ "propertyName": "common.max_count", "defaultValue": "1000", "description": "Max number of SQL result to display." + }, + "zeppelin.jdbc.concurrent.use": { + "envName": null, + "propertyName": "zeppelin.jdbc.concurrent.use", + "defaultValue": "true", + "description": "Use parallel scheduler" + }, + "zeppelin.jdbc.concurrent.max_connection": { + "envName": null, + "propertyName": "zeppelin.jdbc.concurrent.max_connection", + "defaultValue": "10", + "description": "Number of concurrent execution" } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5a4aacef/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java ---------------------------------------------------------------------- diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index 065f4ed..317dbcf 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -22,6 +22,8 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD; import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER; import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL; import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Files; @@ -32,6 +34,9 @@ import java.util.Properties; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.jdbc.JDBCInterpreter; +import org.apache.zeppelin.scheduler.FIFOScheduler; +import org.apache.zeppelin.scheduler.ParallelScheduler; +import org.apache.zeppelin.scheduler.Scheduler; import org.junit.Before; import org.junit.Test; @@ -200,4 +205,27 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type()); assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message()); } + + @Test + public void concurrentSettingTest() { + Properties properties = new Properties(); + properties.setProperty("zeppelin.jdbc.concurrent.use", "true"); + properties.setProperty("zeppelin.jdbc.concurrent.max_connection", "10"); + JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties); + + assertTrue(jdbcInterpreter.isConcurrentExecution()); + assertEquals(10, jdbcInterpreter.getMaxConcurrentConnection()); + + Scheduler scheduler = jdbcInterpreter.getScheduler(); + assertTrue(scheduler instanceof ParallelScheduler); + + properties.clear(); + properties.setProperty("zeppelin.jdbc.concurrent.use", "false"); + jdbcInterpreter = new JDBCInterpreter(properties); + + assertFalse(jdbcInterpreter.isConcurrentExecution()); + + scheduler = jdbcInterpreter.getScheduler(); + assertTrue(scheduler instanceof FIFOScheduler); + } }
