Repository: zeppelin
Updated Branches:
  refs/heads/master c717daf65 -> 89cf8262e


ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is 
created

### What is this PR for?
The issue is that we create 2 SparkSession in zeppelin_pyspark.py (Because we 
create SQLContext first which will create SparkSession underlying). This cause 
2 instances of SparkSession in JVM side and this means we have 2 instances of 
Catalog as well. So udf registered in SQLContext can not be used in 
SparkSession. This PR will create SparkSession first and then assign its 
internal SQLContext to sqlContext in pyspark.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1442

### How should this be tested?
Integration test is added.

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18774832/7f270de4-818f-11e6-9e4f-c4def4353e5c.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

…

Author: Jeff Zhang <[email protected]>

Closes #1452 from zjffdu/ZEPPELIN-1442 and squashes the following commits:

a15e3c6 [Jeff Zhang] fix unit test
93060b6 [Jeff Zhang] ZEPPELIN-1442. UDF can not be found due to 2 instances of 
SparkSession is created


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/89cf8262
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/89cf8262
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/89cf8262

Branch: refs/heads/master
Commit: 89cf8262e6a740c267acad0c040d5d52675d6c00
Parents: c717daf
Author: Jeff Zhang <[email protected]>
Authored: Fri Sep 23 16:18:24 2016 +0800
Committer: Mina Lee <[email protected]>
Committed: Tue Sep 27 10:37:28 2016 +0900

----------------------------------------------------------------------
 .../main/resources/python/zeppelin_pyspark.py   |  6 ++---
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 25 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index 53465c2..49e60d4 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -219,14 +219,12 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 if sparkVersion.isSpark2():
-  sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+  spark = SparkSession(sc, intp.getSparkSession())
+  sqlc = spark._wrapped
 else:
   sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
 sqlContext = sqlc
 
-if sparkVersion.isSpark2():
-  spark = SparkSession(sc, intp.getSparkSession())
-
 completion = PySparkCompletion(intp)
 z = PyZeppelinContext(intp.getZeppelinContext())
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 0255068..5084ae7 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
                 assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().type());
                 // TODO (zjffdu), one more \n is appended, need to investigate 
why.
                 assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
+                       "sqlContext.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
             if (sparkVersion >= 20) {
                 // run SparkSession test
@@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
                 waitForFinish(p);
                 assertEquals(Status.FINISHED, p.getStatus());
                 assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                // use SQLContext to register UDF but use this UDF through 
SparkSession
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
+                        "spark.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
         }
         ZeppelinServer.notebook.removeNote(note.getId(), null);

Reply via email to