Repository: zeppelin Updated Branches: refs/heads/master 1e8559e65 -> c61f1fbce
ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType' ### What is this PR for? The root cause is that SQLContext's signature changes in spark 2.0. Spark 1.6 ``` def __init__(self, sparkContext, sqlContext=None): ``` Spark 2.0 ``` def __init__(self, sparkContext, sparkSession=None, jsqlContext=None): ``` So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1411 ### How should this be tested? Tested using the example code in ZEPPELIN-1411. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits: 40b080a [Jeff Zhang] retry 4922de1 [Jeff Zhang] log more logging for travis CI diangnose 4fe033d [Jeff Zhang] add unit test 296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType' Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c61f1fbc Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c61f1fbc Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c61f1fbc Branch: refs/heads/master Commit: c61f1fbced7e184357c3fa37f0e16bf6ccc6ba3f Parents: 1e8559e Author: Jeff Zhang <[email protected]> Authored: Wed Sep 14 15:42:39 2016 +0800 Committer: Lee moon soo <[email protected]> Committed: Wed Sep 21 08:09:02 2016 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/spark/SparkInterpreter.java | 16 +++++++--- .../main/resources/python/zeppelin_pyspark.py | 5 ++- .../zeppelin/rest/AbstractTestRestApi.java | 16 ++++++++-- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 32 ++++++++++++++++++-- .../src/test/resources/log4j.properties | 2 +- 5 files changed, 59 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 02d766f..44c2a74 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter { private Map<String, Object> binder; private SparkVersion sparkVersion; - private File outputDir; // class outputdir for scala 2.11 + private static File outputDir; // class outputdir for scala 2.11 private Object classServer; // classserver for scala 2.11 @@ -603,8 +603,11 @@ public class SparkInterpreter extends Interpreter { sparkReplClassDir = System.getProperty("java.io.tmpdir"); } - outputDir = createTempDir(sparkReplClassDir); - + synchronized (sharedInterpreterLock) { + if (outputDir == null) { + outputDir = createTempDir(sparkReplClassDir); + } + } argList.add("-Yrepl-class-based"); argList.add("-Yrepl-outdir"); argList.add(outputDir.getAbsolutePath()); @@ -1307,7 +1310,12 @@ public class SparkInterpreter extends Interpreter { logger.info("Close interpreter"); if (numReferenceOfSparkContext.decrementAndGet() == 0) { - sc.stop(); + if (sparkSession != null) { + Utils.invokeMethod(sparkSession, "stop"); + } else if (sc != null){ + sc.stop(); + } + sparkSession = null; sc = null; if (classServer != null) { Utils.invokeMethod(classServer, "stop"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 3e6535f..53465c2 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -218,7 +218,10 @@ java_import(gateway.jvm, "scala.Tuple2") jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) -sqlc = SQLContext(sc, intp.getSQLContext()) +if sparkVersion.isSpark2(): + sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext()) +else: + sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) sqlContext = sqlc if sparkVersion.isSpark2(): http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 580e5a0..eb080fe 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -154,7 +154,7 @@ public abstract class AbstractTestRestApi { // set spark master and other properties sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071"); sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2"); - + sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false"); // set spark home for pyspark sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome()); pySpark = true; @@ -171,10 +171,16 @@ public abstract class AbstractTestRestApi { String sparkHome = getSparkHome(); if (sparkHome != null) { - sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071"); + if (System.getenv("SPARK_MASTER") != null) { + sparkIntpSetting.getProperties().setProperty("master", System.getenv("SPARK_MASTER")); + } else { + sparkIntpSetting.getProperties() + .setProperty("master", "spark://" + getHostname() + ":7071"); + } sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2"); // set spark home for pyspark sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome); + sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false"); pySpark = true; sparkR = true; } @@ -194,7 +200,11 @@ public abstract class AbstractTestRestApi { } private static String getSparkHome() { - String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName()))); + String sparkHome = System.getenv("SPARK_HOME"); + if (sparkHome != null) { + return sparkHome; + } + sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName()))); System.out.println("SPARK HOME detected " + sparkHome); return sparkHome; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 1250f9c..4e516db 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -135,11 +135,38 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { config.put("enabled", true); p.setConfig(config); p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))"); -// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open(); note.run(p.getId()); waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("55\n", p.getResult().message()); + if (sparkVersion >= 13) { + // run sqlContext test + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(age=20, id=1)]\n", p.getResult().message()); + } + if (sparkVersion >= 20) { + // run SparkSession test + p = note.addParagraph(); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(age=20, id=1)]\n", p.getResult().message()); + } } ZeppelinServer.notebook.removeNote(note.getId(), null); } @@ -166,7 +193,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); -// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open(); note.run(p.getId()); waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); @@ -257,6 +283,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.FINISHED, p1.getStatus()); assertEquals("2\n", p1.getResult().message()); } + ZeppelinServer.notebook.removeNote(note.getId(), null); } /** @@ -270,7 +297,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { config.put("enabled", true); p.setConfig(config); p.setText("%spark print(sc.version)"); -// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open(); note.run(p.getId()); waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties index 376ce00..5007390 100644 --- a/zeppelin-server/src/test/resources/log4j.properties +++ b/zeppelin-server/src/test/resources/log4j.properties @@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR # Log all JDBC parameters log4j.logger.org.hibernate.type=ALL - +log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG
