Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 c1d0870ae -> d8291b81f


ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 
'parseDataType'

The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6
```
def __init__(self, sparkContext, sqlContext=None):
```
Spark 2.0
```
def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):
```
So we need to create SQLContext using named parameters, otherwise it would take 
intp.getSQLContext() as sparkSession which cause the issue.

[Bug Fix]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-1411

Tested using the example code in ZEPPELIN-1411.

![image](https://cloud.githubusercontent.com/assets/164491/18260139/9bd702c0-741d-11e6-8b23-946c38a794c3.png)

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits:

40b080a [Jeff Zhang] retry
4922de1 [Jeff Zhang] log more logging for travis CI diangnose
4fe033d [Jeff Zhang] add unit test
296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has 
no attribute 'parseDataType'

(cherry picked from commit c61f1fbced7e184357c3fa37f0e16bf6ccc6ba3f)
Signed-off-by: Lee moon soo <m...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d8291b81
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d8291b81
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d8291b81

Branch: refs/heads/branch-0.6
Commit: d8291b81fc765e80746fa47fcde87ce61eace37e
Parents: c1d0870
Author: Jeff Zhang <zjf...@apache.org>
Authored: Wed Sep 14 15:42:39 2016 +0800
Committer: Lee moon soo <m...@apache.org>
Committed: Wed Sep 21 08:15:41 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/spark/SparkInterpreter.java | 16 +++++++---
 .../main/resources/python/zeppelin_pyspark.py   |  5 ++-
 .../zeppelin/rest/AbstractTestRestApi.java      | 16 ++++++++--
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 32 ++++++++++++++++++--
 .../src/test/resources/log4j.properties         |  2 +-
 5 files changed, 60 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d8291b81/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 2449478..7da6eee 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -115,7 +115,7 @@ public class SparkInterpreter extends Interpreter {
 
   private Map<String, Object> binder;
   private SparkVersion sparkVersion;
-  private File outputDir;          // class outputdir for scala 2.11
+  private static File outputDir;          // class outputdir for scala 2.11
   private Object classServer;      // classserver for scala 2.11
 
 
@@ -620,8 +620,11 @@ public class SparkInterpreter extends Interpreter {
         sparkReplClassDir = System.getProperty("java.io.tmpdir");
       }
 
-      outputDir = createTempDir(sparkReplClassDir);
-
+      synchronized (sharedInterpreterLock) {
+        if (outputDir == null) {
+          outputDir = createTempDir(sparkReplClassDir);
+        }
+      }
       argList.add("-Yrepl-class-based");
       argList.add("-Yrepl-outdir");
       argList.add(outputDir.getAbsolutePath());
@@ -1300,7 +1303,12 @@ public class SparkInterpreter extends Interpreter {
     logger.info("Close interpreter");
 
     if (numReferenceOfSparkContext.decrementAndGet() == 0) {
-      sc.stop();
+      if (sparkSession != null) {
+        Utils.invokeMethod(sparkSession, "stop");
+      } else if (sc != null){
+        sc.stop();
+      }
+      sparkSession = null;
       sc = null;
       if (classServer != null) {
         Utils.invokeMethod(classServer, "stop");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d8291b81/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index 3e6535f..53465c2 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -218,7 +218,10 @@ java_import(gateway.jvm, "scala.Tuple2")
 jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-sqlc = SQLContext(sc, intp.getSQLContext())
+if sparkVersion.isSpark2():
+  sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+else:
+  sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
 sqlContext = sqlc
 
 if sparkVersion.isSpark2():

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d8291b81/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index ebfdcc3..371211a 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -158,7 +158,7 @@ public abstract class AbstractTestRestApi {
         // set spark master and other properties
         sparkIntpSetting.getProperties().setProperty("master", "spark://" + 
getHostname() + ":7071");
         sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
-
+        
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", 
"false");
         // set spark home for pyspark
         sparkIntpSetting.getProperties().setProperty("spark.home", 
getSparkHome());
         pySpark = true;
@@ -175,8 +175,16 @@ public abstract class AbstractTestRestApi {
 
         String sparkHome = getSparkHome();
         if (sparkHome != null) {
+          if (System.getenv("SPARK_MASTER") != null) {
+            sparkIntpSetting.getProperties().setProperty("master", 
System.getenv("SPARK_MASTER"));
+          } else {
+            sparkIntpSetting.getProperties()
+                    .setProperty("master", "spark://" + getHostname() + 
":7071");
+          }
+          sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
           // set spark home for pyspark
           sparkIntpSetting.getProperties().setProperty("spark.home", 
sparkHome);
+          
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", 
"false");
           pySpark = true;
           sparkR = true;
         }
@@ -196,7 +204,11 @@ public abstract class AbstractTestRestApi {
   }
 
   private static String getSparkHome() {
-    String sparkHome = getSparkHomeRecursively(new 
File(System.getProperty("user.dir")));
+    String sparkHome = System.getenv("SPARK_HOME");
+    if (sparkHome != null) {
+      return sparkHome;
+    }
+    sparkHome = getSparkHomeRecursively(new 
File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
     System.out.println("SPARK HOME detected " + sparkHome);
     return sparkHome;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d8291b81/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index cd132c7..76b65ee 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -135,11 +135,38 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
             config.put("enabled", true);
             p.setConfig(config);
             p.setText("%pyspark print(sc.parallelize(range(1, 
11)).reduce(lambda a, b: a + b))");
-//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
             note.run(p.getId());
             waitForFinish(p);
             assertEquals(Status.FINISHED, p.getStatus());
             assertEquals("55\n", p.getResult().message());
+            if (sparkVersion >= 13) {
+                // run sqlContext test
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark from pyspark.sql import Row\n" +
+                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" 
+
+                        "df.collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+            }
+            if (sparkVersion >= 20) {
+                // run SparkSession test
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark from pyspark.sql import Row\n" +
+                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" 
+
+                        "df.collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+            }
         }
         ZeppelinServer.notebook.removeNote(note.id(), null);
     }
@@ -166,7 +193,6 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
             p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
                     + "print(" + sqlContextName + ".range(0, 
10).withColumn('uniform', rand(seed=10) * 3.14).count())");
-//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
             note.run(p.getId());
             waitForFinish(p);
             assertEquals(Status.FINISHED, p.getStatus());
@@ -257,6 +283,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
             assertEquals(Status.FINISHED, p1.getStatus());
             assertEquals("2\n", p1.getResult().message());
         }
+        ZeppelinServer.notebook.removeNote(note.getId(), null);
     }
 
     /**
@@ -270,7 +297,6 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
         config.put("enabled", true);
         p.setConfig(config);
         p.setText("%spark print(sc.version)");
-//        p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
         note.run(p.getId());
         waitForFinish(p);
         assertEquals(Status.FINISHED, p.getStatus());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d8291b81/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties 
b/zeppelin-server/src/test/resources/log4j.properties
index 376ce00..5007390 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-
+log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG

Reply via email to