Repository: zeppelin Updated Branches: refs/heads/master c717daf65 -> 89cf8262e
ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created ### What is this PR for? The issue is that we create 2 SparkSession in zeppelin_pyspark.py (Because we create SQLContext first which will create SparkSession underlying). This cause 2 instances of SparkSession in JVM side and this means we have 2 instances of Catalog as well. So udf registered in SQLContext can not be used in SparkSession. This PR will create SparkSession first and then assign its internal SQLContext to sqlContext in pyspark. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1442 ### How should this be tested? Integration test is added. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No ⦠Author: Jeff Zhang <[email protected]> Closes #1452 from zjffdu/ZEPPELIN-1442 and squashes the following commits: a15e3c6 [Jeff Zhang] fix unit test 93060b6 [Jeff Zhang] ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/89cf8262 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/89cf8262 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/89cf8262 Branch: refs/heads/master Commit: 89cf8262e6a740c267acad0c040d5d52675d6c00 Parents: c717daf Author: Jeff Zhang <[email protected]> Authored: Fri Sep 23 16:18:24 2016 +0800 Committer: Mina Lee <[email protected]> Committed: Tue Sep 27 10:37:28 2016 +0900 ---------------------------------------------------------------------- .../main/resources/python/zeppelin_pyspark.py | 6 ++--- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 25 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 53465c2..49e60d4 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -219,14 +219,12 @@ jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) if sparkVersion.isSpark2(): - sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext()) + spark = SparkSession(sc, intp.getSparkSession()) + sqlc = spark._wrapped else: sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) sqlContext = sqlc -if sparkVersion.isSpark2(): - spark = SparkSession(sc, intp.getSparkSession()) - completion = PySparkCompletion(intp) z = PyZeppelinContext(intp.getZeppelinContext()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 0255068..5084ae7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(InterpreterResult.Type.TABLE, p.getResult().type()); // TODO (zjffdu), one more \n is appended, need to investigate why. assertEquals("age\tid\n20\t1\n\n", p.getResult().message()); + + // test udf + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(len=u'3')]\n", p.getResult().message()); } if (sparkVersion >= 20) { // run SparkSession test @@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[Row(age=20, id=1)]\n", p.getResult().message()); + + // test udf + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + // use SQLContext to register UDF but use this UDF through SparkSession + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(len=u'3')]\n", p.getResult().message()); } } ZeppelinServer.notebook.removeNote(note.getId(), null);
