Add shared sqlContext in pyspark. sqlc
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8db2240b Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8db2240b Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8db2240b Branch: refs/heads/master Commit: 8db2240b6c1497683e2c45129b1659acedad909d Parents: 391c008 Author: Lee moon soo <[email protected]> Authored: Wed Mar 11 16:39:50 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Wed Mar 11 16:39:50 2015 +0900 ---------------------------------------------------------------------- .../com/nflabs/zeppelin/spark/PySparkInterpreter.java | 11 ++++++++++- spark/src/main/resources/python/zeppelin_pyspark.py | 2 +- .../interpreter/remote/RemoteInterpreterServer.java | 3 ++- 3 files changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8db2240b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java index 26d52d1..9cc1e4c 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java @@ -23,6 +23,7 @@ import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,7 +194,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand synchronized (statementSetNotifier) { while (_statements == null) { try { - logger.info("wait for statements"); statementSetNotifier.wait(1000); } catch (InterruptedException e) { } @@ -331,6 +331,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + public SQLContext getSQLContext() { + SparkInterpreter intp = getSparkInterpreter(); + if (intp == null) { + return null; + } else { + return intp.getSQLContext(); + } + } + @Override public void onProcessComplete(int exitValue) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8db2240b/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index ea4eee1..d68e53f 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -34,7 +34,7 @@ jsc = intp.getJavaSparkContext() jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) - +sqlc = SQLContext(sc, intp.getSQLContext()) class Logger(object): def __init__(self): http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8db2240b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java index b8fd7ce..266d6fc 100644 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -113,7 +113,8 @@ public class RemoteInterpreterServer repl.setClassloaderUrls(new URL[]{}); synchronized (interpreterGroup) { - interpreterGroup.add(new ClassloaderInterpreter(repl, cl)); + interpreterGroup.add(new LazyOpenInterpreter( + new ClassloaderInterpreter(repl, cl))); } logger.info("Instantiate interpreter {}", className);
