Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 533022417 -> bd750988a


ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is 
created

### What is this PR for?
The issue is that we create 2 SparkSession in zeppelin_pyspark.py (Because we 
create SQLContext first which will create SparkSession underlying). This cause 
2 instances of SparkSession in JVM side and this means we have 2 instances of 
Catalog as well. So udf registered in SQLContext can not be used in 
SparkSession. This PR will create SparkSession first and then assign its 
internal SQLContext to sqlContext in pyspark.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1442

### How should this be tested?
Integration test is added.

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18774832/7f270de4-818f-11e6-9e4f-c4def4353e5c.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

…

Author: Jeff Zhang <[email protected]>

Closes #1452 from zjffdu/ZEPPELIN-1442 and squashes the following commits:

a15e3c6 [Jeff Zhang] fix unit test
93060b6 [Jeff Zhang] ZEPPELIN-1442. UDF can not be found due to 2 instances of 
SparkSession is created

(cherry picked from commit 89cf8262e6a740c267acad0c040d5d52675d6c00)
Signed-off-by: Mina Lee <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/bd750988
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/bd750988
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/bd750988

Branch: refs/heads/branch-0.6
Commit: bd750988a64072ffc55503ffe86ef8f231904d2a
Parents: 5330224
Author: Jeff Zhang <[email protected]>
Authored: Fri Sep 23 16:18:24 2016 +0800
Committer: Mina Lee <[email protected]>
Committed: Tue Sep 27 10:37:45 2016 +0900

----------------------------------------------------------------------
 .../main/resources/python/zeppelin_pyspark.py   |  6 ++---
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 25 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bd750988/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index 53465c2..49e60d4 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -219,14 +219,12 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 if sparkVersion.isSpark2():
-  sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+  spark = SparkSession(sc, intp.getSparkSession())
+  sqlc = spark._wrapped
 else:
   sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
 sqlContext = sqlc
 
-if sparkVersion.isSpark2():
-  spark = SparkSession(sc, intp.getSparkSession())
-
 completion = PySparkCompletion(intp)
 z = PyZeppelinContext(intp.getZeppelinContext())
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bd750988/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 6227e0b..7767a12 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
                 assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().type());
                 // TODO (zjffdu), one more \n is appended, need to investigate 
why.
                 assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
+                       "sqlContext.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
             if (sparkVersion >= 20) {
                 // run SparkSession test
@@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
                 waitForFinish(p);
                 assertEquals(Status.FINISHED, p.getStatus());
                 assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                // use SQLContext to register UDF but use this UDF through 
SparkSession
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
+                        "spark.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
         }
         ZeppelinServer.notebook.removeNote(note.id(), null);

Reply via email to