Handle multi line indented block
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/99ebeaef Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/99ebeaef Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/99ebeaef Branch: refs/heads/master Commit: 99ebeaef0c0b7753d12e6f9f0257e1613f15ddf5 Parents: 97956e9 Author: Lee moon soo <[email protected]> Authored: Fri Mar 13 21:42:54 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Fri Mar 13 21:42:54 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 5 ----- .../main/resources/python/zeppelin_pyspark.py | 23 ++++++++++++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/99ebeaef/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java index cfd66a9..5e277cd 100644 --- a/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/com/nflabs/zeppelin/spark/PySparkInterpreter.java @@ -149,11 +149,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand pythonPath += getSparkHome() + "/python/lib/py4j-0.8.2.1-src.zip:" + getSparkHome() + "/python"; -// Map<String, String> newEnv = new HashMap<String, String>(env); -// newEnv.put("PYTHONPATH", pythonPath); - env.put("PYTHONPATH", pythonPath); - //EnvironmentUtils.addVariableToEnvironment(env, "PYTHONPATH="+pythonPath); executor.execute(cmd, env, this); pythonscriptRunning = true; @@ -260,7 +256,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand synchronized (statementFinishedNotifier) { while (statementOutput == null) { try { - logger.info("wait for output"); statementFinishedNotifier.wait(1000); } catch (InterruptedException e) { } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/99ebeaef/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 5d956d9..eecaf1f 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -1,7 +1,6 @@ import sys, getopt from py4j.java_gateway import java_import, JavaGateway, GatewayClient - from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.rdd import RDD @@ -55,6 +54,7 @@ output = Logger() sys.stdout = output sys.stderr = output + while True : req = intp.getStatements() try: @@ -62,6 +62,7 @@ while True : jobGroup = req.jobGroup() single = None incomplete = None + compiledCode = None for s in stmts: if s == None or len(s.strip()) == 0: @@ -71,15 +72,24 @@ while True : if s.strip().startswith("#"): continue + if s[0] != " " and s[0] != "\t": + if incomplete != None: + raise incomplete + + if compiledCode != None: + sc.setJobGroup(jobGroup, "Zeppelin") + eval(compiledCode) + compiledCode = None + single = None + incomplete = None + if single == None: single = s else: single += "\n" + s try : - sc.setJobGroup(jobGroup, "Zeppelin") - eval(compile(single, "<String>", "single")) - single = "" + compiledCode = compile(single, "<string>", "single") incomplete = None except SyntaxError as e: if str(e).startswith("unexpected EOF while parsing") : @@ -93,8 +103,13 @@ while True : if incomplete != None: raise incomplete + if compiledCode != None: + sc.setJobGroup(jobGroup, "Zeppelin") + eval(compiledCode) + intp.setStatementsFinished(output.get(), False) except: intp.setStatementsFinished(str(sys.exc_info()), True) output.reset() +
