Repository: zeppelin
Updated Branches:
refs/heads/master 2dc26f27b -> c717daf65
ZEPPELIN-1427. Scala z.show() doesn't work on v.0.6.1
### What is this PR for?
`ZeppelinContext.show` doesn't work for spark 1.6. The root cause is that
`Dataset` is also available in spark 1.6, so the following line will be false
when cls is `Dataset` while o is `Dataframe` in spark 1.6
```
if (cls.isInstance(o)) {
```
This PR create a list of supported class and make it a member of
`ZeppelinContext `so that we don't need to create it every time.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1427
### How should this be tested?
Tested it manually on spark 1.6 using the following sample code
```
z.show(sqlContext.sql("show tables"))
```
### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #1440 from zjffdu/ZEPPELIN-1427 and squashes the following commits:
62dbcad [Jeff Zhang] add unit test
a7ba67d [Jeff Zhang] ZEPPELIN-1427. Scala z.show() doesn't work on v.0.6.1
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c717daf6
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c717daf6
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c717daf6
Branch: refs/heads/master
Commit: c717daf6556c708a771ac89efd36b26ef2988c71
Parents: 2dc26f2
Author: Jeff Zhang <[email protected]>
Authored: Thu Sep 22 11:25:24 2016 +0800
Committer: Mina Lee <[email protected]>
Committed: Mon Sep 26 10:37:48 2016 +0900
----------------------------------------------------------------------
.../apache/zeppelin/spark/ZeppelinContext.java | 54 ++++++++--------
.../remote/RemoteInterpreterServer.java | 2 +
.../zeppelin/rest/ZeppelinSparkClusterTest.java | 68 ++++++++++++++++++++
3 files changed, 98 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c717daf6/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 7bccbac..7465756 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -24,6 +24,7 @@ import static
scala.collection.JavaConversions.collectionAsScalaIterable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -55,6 +56,7 @@ public class ZeppelinContext {
private SparkDependencyResolver dep;
private InterpreterContext interpreterContext;
private int maxResult;
+ private List<Class> supportedClasses;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
@@ -65,6 +67,25 @@ public class ZeppelinContext {
this.interpreterContext = interpreterContext;
this.dep = dep;
this.maxResult = maxResult;
+ this.supportedClasses = new ArrayList<>();
+ try {
+
supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
+ } catch (ClassNotFoundException e) {
+ }
+
+ try {
+
supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
+ } catch (ClassNotFoundException e) {
+ }
+
+ try {
+
supportedClasses.add(this.getClass().forName("org.apache.spark.sql.SchemaRDD"));
+ } catch (ClassNotFoundException e) {
+ }
+
+ if (supportedClasses.isEmpty()) {
+ throw new InterpreterException("Can not road Dataset/DataFrame/SchemaRDD
class");
+ }
}
public SparkContext sc;
@@ -161,33 +182,8 @@ public class ZeppelinContext {
@ZeppelinApi
public void show(Object o, int maxResult) {
- Class cls = null;
try {
- cls = this.getClass().forName("org.apache.spark.sql.Dataset");
- } catch (ClassNotFoundException e) {
- }
-
- if (cls == null) {
- try {
- cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
- } catch (ClassNotFoundException e) {
- }
- }
-
- if (cls == null) {
- try {
- cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
- } catch (ClassNotFoundException e) {
- }
- }
-
- if (cls == null) {
- throw new InterpreterException("Can not road Dataset/DataFrame/SchemaRDD
class");
- }
-
-
- try {
- if (cls.isInstance(o)) {
+ if (supportedClasses.contains(o.getClass())) {
interpreterContext.out.write(showDF(sc, interpreterContext, o,
maxResult));
} else {
interpreterContext.out.write(o.toString());
@@ -210,6 +206,12 @@ public class ZeppelinContext {
sc.setJobGroup(jobGroup, "Zeppelin", false);
try {
+ // convert it to DataFrame if it is Dataset, as we will iterate all the
records
+ // and assume it is type Row.
+ if
(df.getClass().getCanonicalName().equals("org.apache.spark.sql.Dataset")) {
+ Method convertToDFMethod = df.getClass().getMethod("toDF");
+ df = convertToDFMethod.invoke(df);
+ }
take = df.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(df, maxResult + 1);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c717daf6/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 7ddb928..8344366 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -504,11 +504,13 @@ public class RemoteInterpreterServer
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
+ logger.debug("Output Append:" + new String(line));
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new
String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
+ logger.debug("Output Update:" + new String(output));
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new
String(output));
}
});
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c717daf6/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 4e516db..0255068 100644
---
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.rest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -24,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
@@ -83,6 +85,57 @@ public class ZeppelinSparkClusterTest extends
AbstractTestRestApi {
}
@Test
+ public void sparkSQLTest() throws IOException {
+ // create new note
+ Note note = ZeppelinServer.notebook.createNote(null);
+ int sparkVersion = getSparkVersionNumber(note);
+ // DataFrame API is available from spark 1.3
+ if (sparkVersion >= 13) {
+ // test basic dataframe api
+ Paragraph p = note.addParagraph();
+ Map config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%spark val
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
+ "df.collect()");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertTrue(p.getResult().message().contains(
+ "Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+
+ // test display DataFrame
+ p = note.addParagraph();
+ config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%spark val
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
+ "z.show(df)");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
+ assertEquals("_1\t_2\nhello\t20\n", p.getResult().message());
+
+ // test display DataSet
+ if (sparkVersion >= 20) {
+ p = note.addParagraph();
+ config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%spark val
ds=spark.createDataset(Seq((\"hello\",20)))\n" +
+ "z.show(ds)");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertEquals(InterpreterResult.Type.TABLE,
p.getResult().type());
+ assertEquals("_1\t_2\nhello\t20\n", p.getResult().message());
+ }
+ ZeppelinServer.notebook.removeNote(note.getId(), null);
+ }
+ }
+
+ @Test
public void sparkRTest() throws IOException {
// create new note
Note note = ZeppelinServer.notebook.createNote(null);
@@ -152,6 +205,21 @@ public class ZeppelinSparkClusterTest extends
AbstractTestRestApi {
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+
+ // test display Dataframe
+ p = note.addParagraph();
+ config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%pyspark from pyspark.sql import Row\n" +
+ "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n"
+
+ "z.show(df)");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertEquals(InterpreterResult.Type.TABLE,
p.getResult().type());
+ // TODO (zjffdu), one more \n is appended, need to investigate
why.
+ assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
}
if (sparkVersion >= 20) {
// run SparkSession test