Repository: zeppelin Updated Branches: refs/heads/master b96550329 -> b885f43e4
ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter ### What is this PR for? For now, user need to invoke print to make the output displayed on the notebook. This behavior is not natural and consistent with other notebooks. This PR is to make the pyspark interpreter in zeppelin behave the same as other notebook. 2 main changes * use single mode to compile the last statement, so that the evaluation result of the last statement will be printed to stdout, this is consistent with other notebooks (like jupyter) * Make SparkOutputStream extends LogOutputStream so that we can see the output of inner process (Python/R), it is helpful for diagnosing. ### What type of PR is it? [Bug Fix] ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1197 ### How should this be tested? Tested it manually. Input the following text in pyspark paragraph, ``` 1+1 sc.version ``` And get the following output ``` u'1.6.1' ``` ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? User don't need to call print explicitly. * Does this needs documentation? Yes Author: Jeff Zhang <zjf...@apache.org> Closes #1232 from zjffdu/ZEPPELIN-1197 and squashes the following commits: 3771245 [Jeff Zhang] fix and add test 10182e6 [Jeff Zhang] ZEPPELIN-1197. Should print output directly without invoking function print in pyspark interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b885f43e Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b885f43e Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b885f43e Branch: refs/heads/master Commit: b885f43e4c63a4fdd7f591f8286b788d6ed2d719 Parents: b965503 Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Jul 28 17:36:37 2016 +0800 Committer: Lee moon soo <m...@apache.org> Committed: Wed Aug 3 08:36:16 2016 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/spark/LogOutputStream.java | 116 +++++++++++++++++++ .../zeppelin/spark/PySparkInterpreter.java | 2 +- .../apache/zeppelin/spark/SparkInterpreter.java | 2 +- .../zeppelin/spark/SparkOutputStream.java | 19 ++- .../org/apache/zeppelin/spark/ZeppelinR.java | 2 +- .../main/resources/python/zeppelin_pyspark.py | 31 +++-- .../zeppelin/integration/SparkParagraphIT.java | 19 +++ 7 files changed, 176 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java new file mode 100644 index 0000000..d941cd7 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + + +/** + * Minor modification of LogOutputStream of apache commons exec. + * LogOutputStream of apache commons exec has one issue that method flush doesn't throw IOException, + * so that SparkOutputStream can not extend it correctly. + */ +public abstract class LogOutputStream extends OutputStream { + private static final int INTIAL_SIZE = 132; + private static final int CR = 13; + private static final int LF = 10; + private final ByteArrayOutputStream buffer; + private boolean skip; + private final int level; + + public LogOutputStream() { + this(999); + } + + public LogOutputStream(int level) { + this.buffer = new ByteArrayOutputStream(132); + this.skip = false; + this.level = level; + } + + @Override + public void write(int cc) throws IOException { + byte c = (byte) cc; + if (c != 10 && c != 13) { + this.buffer.write(cc); + } else if (!this.skip) { + this.processBuffer(); + } + + this.skip = c == 13; + } + + @Override + public void flush() throws IOException { + if (this.buffer.size() > 0) { + this.processBuffer(); + } + + } + + @Override + public void close() throws IOException { + if (this.buffer.size() > 0) { + this.processBuffer(); + } + + super.close(); + } + + public int getMessageLevel() { + return this.level; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int offset = off; + int blockStartOffset = off; + + for (int remaining = len; remaining > 0; blockStartOffset = offset) { + while (remaining > 0 && b[offset] != 10 && b[offset] != 13) { + ++offset; + --remaining; + } + + int blockLength = offset - blockStartOffset; + if (blockLength > 0) { + this.buffer.write(b, blockStartOffset, blockLength); + } + + while (remaining > 0 && (b[offset] == 10 || b[offset] == 13)) { + this.write(b[offset]); + ++offset; + --remaining; + } + } + + } + + protected void processBuffer() { + this.processLine(this.buffer.toString()); + this.buffer.reset(); + } + + protected void processLine(String line) { + this.processLine(line, this.level); + } + + protected abstract void processLine(String var1, int var2); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index f63f3d4..d4f45ea 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -179,7 +179,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(); + outputStream = new SparkOutputStream(logger); PipedOutputStream ps = new PipedOutputStream(); in = null; try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 29c322d..879ed4a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -124,7 +124,7 @@ public class SparkInterpreter extends Interpreter { public SparkInterpreter(Properties property) { super(property); - out = new SparkOutputStream(); + out = new SparkOutputStream(logger); } public SparkInterpreter(Properties property, SparkContext sc) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java index 98a4090..e454994 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java @@ -17,17 +17,20 @@ package org.apache.zeppelin.spark; import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.slf4j.Logger; import java.io.IOException; -import java.io.OutputStream; /** * InterpreterOutput can be attached / detached. */ -public class SparkOutputStream extends OutputStream { +public class SparkOutputStream extends LogOutputStream { + + public static Logger logger; InterpreterOutput interpreterOutput; - public SparkOutputStream() { + public SparkOutputStream(Logger logger) { + this.logger = logger; } public InterpreterOutput getInterpreterOutput() { @@ -40,6 +43,7 @@ public class SparkOutputStream extends OutputStream { @Override public void write(int b) throws IOException { + super.write(b); if (interpreterOutput != null) { interpreterOutput.write(b); } @@ -47,6 +51,7 @@ public class SparkOutputStream extends OutputStream { @Override public void write(byte [] b) throws IOException { + super.write(b); if (interpreterOutput != null) { interpreterOutput.write(b); } @@ -54,13 +59,20 @@ public class SparkOutputStream extends OutputStream { @Override public void write(byte [] b, int offset, int len) throws IOException { + super.write(b, offset, len); if (interpreterOutput != null) { interpreterOutput.write(b, offset, len); } } @Override + protected void processLine(String s, int i) { + logger.debug("Interpreter output:" + s); + } + + @Override public void close() throws IOException { + super.close(); if (interpreterOutput != null) { interpreterOutput.close(); } @@ -68,6 +80,7 @@ public class SparkOutputStream extends OutputStream { @Override public void flush() throws IOException { + super.flush(); if (interpreterOutput != null) { interpreterOutput.flush(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index e0a47b7..2648833 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -143,7 +143,7 @@ public class ZeppelinR implements ExecuteResultHandler { cmd.addArgument(Integer.toString(sparkVersion.toNumber())); executor = new DefaultExecutor(); - outputStream = new SparkOutputStream(); + outputStream = new SparkOutputStream(logger); input = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(input); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 0380afa..2e95c85 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -27,19 +27,20 @@ from pyspark.storagelevel import StorageLevel from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer +import ast # for back compatibility from pyspark.sql import SQLContext, HiveContext, Row class Logger(object): def __init__(self): - self.out = "" + pass def write(self, message): intp.appendOutput(message) def reset(self): - self.out = "" + pass def flush(self): pass @@ -230,7 +231,7 @@ while True : try: stmts = req.statements().split("\n") jobGroup = req.jobGroup() - final_code = None + final_code = [] for s in stmts: if s == None: @@ -241,15 +242,27 @@ while True : if len(s_stripped) == 0 or s_stripped.startswith("#"): continue - if final_code: - final_code += "\n" + s - else: - final_code = s + final_code.append(s) if final_code: - compiledCode = compile(final_code, "<string>", "exec") + # use exec mode to compile the statements except the last statement, + # so that the last statement's evaluation will be printed to stdout sc.setJobGroup(jobGroup, "Zeppelin") - eval(compiledCode) + code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) + to_run_exec, to_run_single = code.body[:-1], code.body[-1:] + + try: + for node in to_run_exec: + mod = ast.Module([node]) + code = compile(mod, '<stdin>', 'exec') + exec(code) + + for node in to_run_single: + mod = ast.Interactive([node]) + code = compile(mod, '<stdin>', 'single') + exec(code) + except: + raise Execution(sys.exc_info()) intp.setStatementsFinished("", False) except Py4JJavaError: http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java index 0ff0135..5009076 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java @@ -140,6 +140,25 @@ public class SparkParagraphIT extends AbstractZeppelinIT { paragraph1Result.getText().toString(), CoreMatchers.equalTo("test loop 0\ntest loop 1\ntest loop 2") ); + // the last statement's evaluation result is printed + setTextOfParagraph(2, "%pyspark\\n" + + "sc.version\\n" + + "1+1"); + runParagraph(2); + try { + waitForParagraph(2, "FINISHED"); + } catch (TimeoutException e) { + waitForParagraph(2, "ERROR"); + collector.checkThat("Paragraph from SparkParagraphIT of testPySpark status: ", + "ERROR", CoreMatchers.equalTo("FINISHED") + ); + } + WebElement paragraph2Result = driver.findElement(By.xpath( + getParagraphXPath(2) + "//div[@class=\"tableDisplay\"]")); + collector.checkThat("Paragraph from SparkParagraphIT of testPySpark result: ", + paragraph2Result.getText().toString(), CoreMatchers.equalTo("2") + ); + } catch (Exception e) { handleException("Exception in SparkParagraphIT while testPySpark", e); }