Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 b0d99171b -> b91a7cdfc


ZEPPELIN-995 Change scheduler for JDBC interpreter to use concurrent execution

### What is this PR for?
Changed scheduler from FIFO to Parallels in JdbcInterpreter. This is a default 
behaviour of HiveInterpreter. When we merge all JDBC-like interpreter into 
JDBC, we need to change default behaviour of JdbcInterpreter.

### What type of PR is it?
[Feature]

### Todos
* [x] - Changed scheduler

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-995

### How should this be tested?
You can run multiple queries simultaneously.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <[email protected]>

Closes #1005 from jongyoul/ZEPPELIN-995 and squashes the following commits:

af360fa [Jongyoul Lee] Added option to choose which scheduler we use
3bda988 [Jongyoul Lee] Changed scheduler from FIFO to Parallels in 
JdbcInterpreter

(cherry picked from commit 5a4aacef25b0b54d151cfc7a3ea81cc312f6f655)
Signed-off-by: Jongyoul Lee <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b91a7cdf
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b91a7cdf
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b91a7cdf

Branch: refs/heads/branch-0.6
Commit: b91a7cdfcc4159b903053e89a519e5cbbe0a3c49
Parents: b0d9917
Author: Jongyoul Lee <[email protected]>
Authored: Wed Jun 15 21:00:16 2016 +0900
Committer: Jongyoul Lee <[email protected]>
Committed: Mon Jun 20 13:41:31 2016 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/jdbc/JDBCInterpreter.java   | 21 +++++++++++++--
 .../src/main/resources/interpreter-setting.json | 12 +++++++++
 .../zeppelin/jdbc/JDBCInterpreterTest.java      | 28 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b91a7cdf/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 5500ee0..e9cf9f8 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -99,6 +99,9 @@ public class JDBCInterpreter extends Interpreter {
 
   static final String EMPTY_COLUMN_VALUE = "";
 
+  private final String CONCURRENT_EXECUTION_KEY = 
"zeppelin.jdbc.concurrent.use";
+  private final String CONCURRENT_EXECUTION_COUNT = 
"zeppelin.jdbc.concurrent.max_connection";
+
   private final HashMap<String, Properties> propertiesMap;
   private final Map<String, Statement> paragraphIdStatementMap;
 
@@ -434,8 +437,10 @@ public class JDBCInterpreter extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-        JDBCInterpreter.class.getName() + this.hashCode());
+    String schedulerName = JDBCInterpreter.class.getName() + this.hashCode();
+    return isConcurrentExecution() ?
+            
SchedulerFactory.singleton().createOrGetParallelScheduler(schedulerName, 10)
+            : 
SchedulerFactory.singleton().createOrGetFIFOScheduler(schedulerName);
   }
 
   @Override
@@ -454,5 +459,17 @@ public class JDBCInterpreter extends Interpreter {
     return Integer.valueOf(
         propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, 
MAX_LINE_DEFAULT));
   }
+
+  boolean isConcurrentExecution() {
+    return Boolean.valueOf(getProperty(CONCURRENT_EXECUTION_KEY));
+  }
+
+  int getMaxConcurrentConnection() {
+    try {
+      return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT));
+    } catch (Exception e) {
+      return 10;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b91a7cdf/jdbc/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/jdbc/src/main/resources/interpreter-setting.json 
b/jdbc/src/main/resources/interpreter-setting.json
index 97b2c61..069880c 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -33,6 +33,18 @@
         "propertyName": "common.max_count",
         "defaultValue": "1000",
         "description": "Max number of SQL result to display."
+      },
+      "zeppelin.jdbc.concurrent.use": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.concurrent.use",
+        "defaultValue": "true",
+        "description": "Use parallel scheduler"
+      },
+      "zeppelin.jdbc.concurrent.max_connection": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.concurrent.max_connection",
+        "defaultValue": "10",
+        "description": "Number of concurrent execution"
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b91a7cdf/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java 
b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index 065f4ed..317dbcf 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -22,6 +22,8 @@ import static 
org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -32,6 +34,9 @@ import java.util.Properties;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.jdbc.JDBCInterpreter;
+import org.apache.zeppelin.scheduler.FIFOScheduler;
+import org.apache.zeppelin.scheduler.ParallelScheduler;
+import org.apache.zeppelin.scheduler.Scheduler;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -200,4 +205,27 @@ public class JDBCInterpreterTest extends 
BasicJDBCTestCaseAdapter {
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
     assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message());
   }
+
+  @Test
+  public void concurrentSettingTest() {
+    Properties properties = new Properties();
+    properties.setProperty("zeppelin.jdbc.concurrent.use", "true");
+    properties.setProperty("zeppelin.jdbc.concurrent.max_connection", "10");
+    JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
+
+    assertTrue(jdbcInterpreter.isConcurrentExecution());
+    assertEquals(10, jdbcInterpreter.getMaxConcurrentConnection());
+
+    Scheduler scheduler = jdbcInterpreter.getScheduler();
+    assertTrue(scheduler instanceof ParallelScheduler);
+
+    properties.clear();
+    properties.setProperty("zeppelin.jdbc.concurrent.use", "false");
+    jdbcInterpreter = new JDBCInterpreter(properties);
+
+    assertFalse(jdbcInterpreter.isConcurrentExecution());
+
+    scheduler = jdbcInterpreter.getScheduler();
+    assertTrue(scheduler instanceof FIFOScheduler);
+  }
 }

Reply via email to