Make code compatible with spark-1.3

Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f30e08f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f30e08f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f30e08f8

Branch: refs/heads/master
Commit: f30e08f8432dc6dbd772e6ea70edf0b099874503
Parents: 3058d6d
Author: Lee moon soo <[email protected]>
Authored: Sat Mar 14 03:41:06 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sat Mar 14 03:41:06 2015 +0900

----------------------------------------------------------------------
 .../nflabs/zeppelin/spark/SparkInterpreter.java | 58 ++++++++++++++++---
 .../zeppelin/spark/SparkSqlInterpreter.java     | 60 ++++++++++++++------
 .../nflabs/zeppelin/spark/ZeppelinContext.java  |  3 +-
 3 files changed, 95 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
index 4ba5cd2..47d5be0 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
@@ -14,6 +14,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.spark.HttpServer;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkEnv;
@@ -49,6 +50,7 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
 
 import com.nflabs.zeppelin.interpreter.Interpreter;
 import com.nflabs.zeppelin.interpreter.InterpreterContext;
+import com.nflabs.zeppelin.interpreter.InterpreterException;
 import com.nflabs.zeppelin.interpreter.InterpreterGroup;
 import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
 import com.nflabs.zeppelin.interpreter.InterpreterResult;
@@ -181,12 +183,34 @@ public class SparkInterpreter extends Interpreter {
 
     String execUri = System.getenv("SPARK_EXECUTOR_URI");
     String[] jars = SparkILoop.getAddedJars();
+
+    String classServerUri = null;
+
+    try { // in case of spark 1.1x, spark 1.2x
+      Method classServer = 
interpreter.intp().getClass().getMethod("classServer");
+      HttpServer httpServer = (HttpServer) 
classServer.invoke(interpreter.intp());
+      classServerUri = httpServer.uri();
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      // continue
+    }
+
+    if (classServerUri == null) {
+      try { // for spark 1.3x
+        Method classServer = 
interpreter.intp().getClass().getMethod("classServerUri");
+        classServerUri = (String) classServer.invoke(interpreter.intp());
+      } catch (NoSuchMethodException | SecurityException | 
IllegalAccessException
+          | IllegalArgumentException | InvocationTargetException e) {
+        throw new InterpreterException(e);
+      }
+    }
+
     SparkConf conf =
         new SparkConf()
             .setMaster(getProperty("master"))
             .setAppName(getProperty("spark.app.name"))
             .setJars(jars)
-            .set("spark.repl.class.uri", 
interpreter.intp().classServer().uri());
+            .set("spark.repl.class.uri", classServerUri);
 
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri);
@@ -344,7 +368,20 @@ public class SparkInterpreter extends Interpreter {
 
     z = new ZeppelinContext(sc, sqlc, getHiveContext(), null, dep, 
printStream);
 
-    this.interpreter.loadFiles(settings);
+    try {
+      if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
+        Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", 
Settings.class);
+        loadFiles.invoke(this.interpreter, settings);
+      } else if (sc.version().startsWith("1.3")) {
+        Method loadFiles = this.interpreter.getClass().getMethod(
+            "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
+        loadFiles.invoke(this.interpreter, settings);
+      }
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
 
     intp.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
     binder = (Map<String, Object>) getValue("_binder");
@@ -363,7 +400,16 @@ public class SparkInterpreter extends Interpreter {
     intp.interpret("@transient val hiveContext = "
         + 
"_binder.get(\"hiveContext\").asInstanceOf[org.apache.spark.sql.hive.HiveContext]");
     intp.interpret("import org.apache.spark.SparkContext._");
-    intp.interpret("import sqlc._");
+
+    if (sc.version().startsWith("1.1")) {
+      intp.interpret("import sqlc._");
+    } else if (sc.version().startsWith("1.2")) {
+      intp.interpret("import sqlc._");
+    } else if (sc.version().startsWith("1.3")) {
+      intp.interpret("import sqlc.implicits._");
+      intp.interpret("import sqlc.sql");
+      intp.interpret("import org.apache.spark.sql.functions._");
+    }
 
     // add jar
     if (depInterpreter != null) {
@@ -421,10 +467,6 @@ public class SparkInterpreter extends Interpreter {
     return scala.collection.JavaConversions.asJavaList(ret.candidates());
   }
 
-  public void bindValue(String name, Object o) {
-    getResultCode(intp.bindValue(name, o));
-  }
-
   public Object getValue(String name) {
     Object ret = intp.valueOfTerm(name);
     if (ret instanceof None) {
@@ -539,6 +581,8 @@ public class SparkInterpreter extends Interpreter {
           progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
         } else if (sc.version().startsWith("1.2")) {
           progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
+        } else if (sc.version().startsWith("1.3")) {
+          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
         } else {
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
index 3cdce03..6df4eec 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
@@ -12,9 +12,9 @@ import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
 import org.apache.spark.scheduler.Stage;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SchemaRDD;
+import org.apache.spark.sql.SQLContext.QueryExecution;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.sql.catalyst.expressions.Row;
+//import org.apache.spark.sql.catalyst.expressions.Row;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,11 +122,15 @@ public class SparkSqlInterpreter extends Interpreter {
     }
 
     sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-    SchemaRDD rdd;
-    Row[] rows = null;
+
+    // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
+    Object rdd;
+    Object[] rows = null;
     try {
       rdd = sqlc.sql(st);
-      rows = rdd.take(maxResult + 1);
+
+      Method take = rdd.getClass().getMethod("take", Integer.class);
+      rows = (Object[]) take.invoke(rdd, maxResult + 1);
     } catch (Exception e) {
       logger.error("Error", e);
       sc.clearJobGroup();
@@ -134,10 +138,22 @@ public class SparkSqlInterpreter extends Interpreter {
     }
 
     String msg = null;
+
     // get field names
+    Method queryExecution;
+    QueryExecution qe;
+    try {
+      queryExecution = rdd.getClass().getMethod("queryExecution");
+      qe = (QueryExecution) queryExecution.invoke(rdd);
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
     List<Attribute> columns =
         scala.collection.JavaConverters.asJavaListConverter(
-            rdd.queryExecution().analyzed().output()).asJava();
+            qe.analyzed().output()).asJava();
+
     for (Attribute col : columns) {
       if (msg == null) {
         msg = col.name();
@@ -145,26 +161,34 @@ public class SparkSqlInterpreter extends Interpreter {
         msg += "\t" + col.name();
       }
     }
+
     msg += "\n";
 
     // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, 
DynamicType,
     // FloatType, FractionalType, IntegerType, IntegralType, LongType, 
MapType, NativeType,
     // NullType, NumericType, ShortType, StringType, StructType
 
-    for (int r = 0; r < maxResult && r < rows.length; r++) {
-      Row row = rows[r];
-
-      for (int i = 0; i < columns.size(); i++) {
-        if (!row.isNullAt(i)) {
-          msg += row.apply(i).toString();
-        } else {
-          msg += "null";
-        }
-        if (i != columns.size() - 1) {
-          msg += "\t";
+    try {
+      for (int r = 0; r < maxResult && r < rows.length; r++) {
+        Object row = rows[r];
+        Method isNullAt = row.getClass().getMethod("isNullAt", Integer.class);
+        Method apply = row.getClass().getMethod("apply", Integer.class);
+
+        for (int i = 0; i < columns.size(); i++) {
+          if (!(Boolean) isNullAt.invoke(row, i)) {
+            msg += apply.invoke(row, i).toString();
+          } else {
+            msg += "null";
+          }
+          if (i != columns.size() - 1) {
+            msg += "\t";
+          }
         }
+        msg += "\n";
       }
-      msg += "\n";
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
     }
 
     if (rows.length > maxResult) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f30e08f8/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
index e475de6..f85734c 100644
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
@@ -10,7 +10,6 @@ import java.util.Iterator;
 
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SchemaRDD;
 import org.apache.spark.sql.hive.HiveContext;
 
 import scala.Tuple2;
@@ -49,9 +48,11 @@ public class ZeppelinContext {
   public HiveContext hiveContext;
   private GUI gui;
 
+  /* spark-1.3
   public SchemaRDD sql(String sql) {
     return sqlContext.sql(sql);
   }
+  */
 
   /**
    * Load dependency for interpreter and runtime (driver).

Reply via email to