Repository: zeppelin Updated Branches: refs/heads/branch-0.6 533022417 -> bd750988a
ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created ### What is this PR for? The issue is that we create 2 SparkSession in zeppelin_pyspark.py (Because we create SQLContext first which will create SparkSession underlying). This cause 2 instances of SparkSession in JVM side and this means we have 2 instances of Catalog as well. So udf registered in SQLContext can not be used in SparkSession. This PR will create SparkSession first and then assign its internal SQLContext to sqlContext in pyspark. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1442 ### How should this be tested? Integration test is added. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No ⦠Author: Jeff Zhang <[email protected]> Closes #1452 from zjffdu/ZEPPELIN-1442 and squashes the following commits: a15e3c6 [Jeff Zhang] fix unit test 93060b6 [Jeff Zhang] ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created (cherry picked from commit 89cf8262e6a740c267acad0c040d5d52675d6c00) Signed-off-by: Mina Lee <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/bd750988 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/bd750988 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/bd750988 Branch: refs/heads/branch-0.6 Commit: bd750988a64072ffc55503ffe86ef8f231904d2a Parents: 5330224 Author: Jeff Zhang <[email protected]> Authored: Fri Sep 23 16:18:24 2016 +0800 Committer: Mina Lee <[email protected]> Committed: Tue Sep 27 10:37:45 2016 +0900 ---------------------------------------------------------------------- .../main/resources/python/zeppelin_pyspark.py | 6 ++--- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 25 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bd750988/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 53465c2..49e60d4 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -219,14 +219,12 @@ jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) if sparkVersion.isSpark2(): - sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext()) + spark = SparkSession(sc, intp.getSparkSession()) + sqlc = spark._wrapped else: sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) sqlContext = sqlc -if sparkVersion.isSpark2(): - spark = SparkSession(sc, intp.getSparkSession()) - completion = PySparkCompletion(intp) z = PyZeppelinContext(intp.getZeppelinContext()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/bd750988/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 6227e0b..7767a12 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(InterpreterResult.Type.TABLE, p.getResult().type()); // TODO (zjffdu), one more \n is appended, need to investigate why. assertEquals("age\tid\n20\t1\n\n", p.getResult().message()); + + // test udf + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(len=u'3')]\n", p.getResult().message()); } if (sparkVersion >= 20) { // run SparkSession test @@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[Row(age=20, id=1)]\n", p.getResult().message()); + + // test udf + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + // use SQLContext to register UDF but use this UDF through SparkSession + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(len=u'3')]\n", p.getResult().message()); } } ZeppelinServer.notebook.removeNote(note.id(), null);
