Repository: zeppelin
Updated Branches:
  refs/heads/master b96550329 -> b885f43e4


ZEPPELIN-1197. Should print output directly without invoking function print in 
pyspark interpreter

### What is this PR for?
For now, user need to invoke print to make the output displayed on the 
notebook. This behavior is not natural and consistent with other notebooks. 
This PR is to make the pyspark interpreter in zeppelin behave the same as other 
notebook. 2 main changes
* use single mode to compile the last statement, so that the evaluation result 
of the last statement will be printed to stdout, this is consistent with other 
notebooks (like jupyter)
* Make SparkOutputStream extends LogOutputStream so that we can see the output 
of inner process (Python/R), it is helpful for diagnosing.

### What type of PR is it?
[Bug Fix]

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1197

### How should this be tested?
Tested it manually. Input the following text in pyspark paragraph,
```
1+1
sc.version
```
And get the following output
```
u'1.6.1'
```

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? User don't need to call print 
explicitly.
* Does this needs documentation? Yes

Author: Jeff Zhang <zjf...@apache.org>

Closes #1232 from zjffdu/ZEPPELIN-1197 and squashes the following commits:

3771245 [Jeff Zhang] fix and add test
10182e6 [Jeff Zhang] ZEPPELIN-1197. Should print output directly without 
invoking function print in pyspark interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b885f43e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b885f43e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b885f43e

Branch: refs/heads/master
Commit: b885f43e4c63a4fdd7f591f8286b788d6ed2d719
Parents: b965503
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Jul 28 17:36:37 2016 +0800
Committer: Lee moon soo <m...@apache.org>
Committed: Wed Aug 3 08:36:16 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/spark/LogOutputStream.java  | 116 +++++++++++++++++++
 .../zeppelin/spark/PySparkInterpreter.java      |   2 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |   2 +-
 .../zeppelin/spark/SparkOutputStream.java       |  19 ++-
 .../org/apache/zeppelin/spark/ZeppelinR.java    |   2 +-
 .../main/resources/python/zeppelin_pyspark.py   |  31 +++--
 .../zeppelin/integration/SparkParagraphIT.java  |  19 +++
 7 files changed, 176 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java 
b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
new file mode 100644
index 0000000..d941cd7
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/LogOutputStream.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+/**
+ * Minor modification of LogOutputStream of apache commons exec.
+ * LogOutputStream of apache commons exec has one issue that method flush 
doesn't throw IOException,
+ * so that SparkOutputStream can not extend it correctly.
+ */
+public abstract class LogOutputStream extends OutputStream {
+  private static final int INTIAL_SIZE = 132;
+  private static final int CR = 13;
+  private static final int LF = 10;
+  private final ByteArrayOutputStream buffer;
+  private boolean skip;
+  private final int level;
+
+  public LogOutputStream() {
+    this(999);
+  }
+
+  public LogOutputStream(int level) {
+    this.buffer = new ByteArrayOutputStream(132);
+    this.skip = false;
+    this.level = level;
+  }
+
+  @Override
+  public void write(int cc) throws IOException {
+    byte c = (byte) cc;
+    if (c != 10 && c != 13) {
+      this.buffer.write(cc);
+    } else if (!this.skip) {
+      this.processBuffer();
+    }
+
+    this.skip = c == 13;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (this.buffer.size() > 0) {
+      this.processBuffer();
+    }
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.buffer.size() > 0) {
+      this.processBuffer();
+    }
+
+    super.close();
+  }
+
+  public int getMessageLevel() {
+    return this.level;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    int offset = off;
+    int blockStartOffset = off;
+
+    for (int remaining = len; remaining > 0; blockStartOffset = offset) {
+      while (remaining > 0 && b[offset] != 10 && b[offset] != 13) {
+        ++offset;
+        --remaining;
+      }
+
+      int blockLength = offset - blockStartOffset;
+      if (blockLength > 0) {
+        this.buffer.write(b, blockStartOffset, blockLength);
+      }
+
+      while (remaining > 0 && (b[offset] == 10 || b[offset] == 13)) {
+        this.write(b[offset]);
+        ++offset;
+        --remaining;
+      }
+    }
+
+  }
+
+  protected void processBuffer() {
+    this.processLine(this.buffer.toString());
+    this.buffer.reset();
+  }
+
+  protected void processLine(String line) {
+    this.processLine(line, this.level);
+  }
+
+  protected abstract void processLine(String var1, int var2);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index f63f3d4..d4f45ea 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -179,7 +179,7 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     cmd.addArgument(Integer.toString(port), false);
     
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()),
 false);
     executor = new DefaultExecutor();
-    outputStream = new SparkOutputStream();
+    outputStream = new SparkOutputStream(logger);
     PipedOutputStream ps = new PipedOutputStream();
     in = null;
     try {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 29c322d..879ed4a 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -124,7 +124,7 @@ public class SparkInterpreter extends Interpreter {
 
   public SparkInterpreter(Properties property) {
     super(property);
-    out = new SparkOutputStream();
+    out = new SparkOutputStream(logger);
   }
 
   public SparkInterpreter(Properties property, SparkContext sc) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
index 98a4090..e454994 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkOutputStream.java
@@ -17,17 +17,20 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.io.OutputStream;
 
 /**
  * InterpreterOutput can be attached / detached.
  */
-public class SparkOutputStream extends OutputStream {
+public class SparkOutputStream extends LogOutputStream {
+
+  public static Logger logger;
   InterpreterOutput interpreterOutput;
 
-  public SparkOutputStream() {
+  public SparkOutputStream(Logger logger) {
+    this.logger = logger;
   }
 
   public InterpreterOutput getInterpreterOutput() {
@@ -40,6 +43,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(int b) throws IOException {
+    super.write(b);
     if (interpreterOutput != null) {
       interpreterOutput.write(b);
     }
@@ -47,6 +51,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(byte [] b) throws IOException {
+    super.write(b);
     if (interpreterOutput != null) {
       interpreterOutput.write(b);
     }
@@ -54,13 +59,20 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void write(byte [] b, int offset, int len) throws IOException {
+    super.write(b, offset, len);
     if (interpreterOutput != null) {
       interpreterOutput.write(b, offset, len);
     }
   }
 
   @Override
+  protected void processLine(String s, int i) {
+    logger.debug("Interpreter output:" + s);
+  }
+
+  @Override
   public void close() throws IOException {
+    super.close();
     if (interpreterOutput != null) {
       interpreterOutput.close();
     }
@@ -68,6 +80,7 @@ public class SparkOutputStream extends OutputStream {
 
   @Override
   public void flush() throws IOException {
+    super.flush();
     if (interpreterOutput != null) {
       interpreterOutput.flush();
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java 
b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index e0a47b7..2648833 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -143,7 +143,7 @@ public class ZeppelinR implements ExecuteResultHandler {
     cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
 
     executor = new DefaultExecutor();
-    outputStream = new SparkOutputStream();
+    outputStream = new SparkOutputStream(logger);
 
     input = new PipedOutputStream();
     PipedInputStream in = new PipedInputStream(input);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index 0380afa..2e95c85 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -27,19 +27,20 @@ from pyspark.storagelevel import StorageLevel
 from pyspark.accumulators import Accumulator, AccumulatorParam
 from pyspark.broadcast import Broadcast
 from pyspark.serializers import MarshalSerializer, PickleSerializer
+import ast
 
 # for back compatibility
 from pyspark.sql import SQLContext, HiveContext, Row
 
 class Logger(object):
   def __init__(self):
-    self.out = ""
+    pass
 
   def write(self, message):
     intp.appendOutput(message)
 
   def reset(self):
-    self.out = ""
+    pass
 
   def flush(self):
     pass
@@ -230,7 +231,7 @@ while True :
   try:
     stmts = req.statements().split("\n")
     jobGroup = req.jobGroup()
-    final_code = None
+    final_code = []
 
     for s in stmts:
       if s == None:
@@ -241,15 +242,27 @@ while True :
       if len(s_stripped) == 0 or s_stripped.startswith("#"):
         continue
 
-      if final_code:
-        final_code += "\n" + s
-      else:
-        final_code = s
+      final_code.append(s)
 
     if final_code:
-      compiledCode = compile(final_code, "<string>", "exec")
+      # use exec mode to compile the statements except the last statement,
+      # so that the last statement's evaluation will be printed to stdout
       sc.setJobGroup(jobGroup, "Zeppelin")
-      eval(compiledCode)
+      code = compile('\n'.join(final_code), '<stdin>', 'exec', 
ast.PyCF_ONLY_AST, 1)
+      to_run_exec, to_run_single = code.body[:-1], code.body[-1:]
+
+      try:
+        for node in to_run_exec:
+          mod = ast.Module([node])
+          code = compile(mod, '<stdin>', 'exec')
+          exec(code)
+
+        for node in to_run_single:
+          mod = ast.Interactive([node])
+          code = compile(mod, '<stdin>', 'single')
+          exec(code)
+      except:
+        raise Execution(sys.exc_info())
 
     intp.setStatementsFinished("", False)
   except Py4JJavaError:

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b885f43e/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
index 0ff0135..5009076 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/SparkParagraphIT.java
@@ -140,6 +140,25 @@ public class SparkParagraphIT extends AbstractZeppelinIT {
           paragraph1Result.getText().toString(), CoreMatchers.equalTo("test 
loop 0\ntest loop 1\ntest loop 2")
       );
 
+      // the last statement's evaluation result is printed
+      setTextOfParagraph(2, "%pyspark\\n" +
+              "sc.version\\n" +
+              "1+1");
+      runParagraph(2);
+      try {
+        waitForParagraph(2, "FINISHED");
+      } catch (TimeoutException e) {
+        waitForParagraph(2, "ERROR");
+        collector.checkThat("Paragraph from SparkParagraphIT of testPySpark 
status: ",
+                "ERROR", CoreMatchers.equalTo("FINISHED")
+        );
+      }
+      WebElement paragraph2Result = driver.findElement(By.xpath(
+              getParagraphXPath(2) + "//div[@class=\"tableDisplay\"]"));
+      collector.checkThat("Paragraph from SparkParagraphIT of testPySpark 
result: ",
+              paragraph2Result.getText().toString(), CoreMatchers.equalTo("2")
+      );
+
     } catch (Exception e) {
       handleException("Exception in SparkParagraphIT while testPySpark", e);
     }

Reply via email to