Repository: zeppelin Updated Branches: refs/heads/master c1935e1e8 -> 52b3cbff7
[ZEPPELIN-1260] R interpreter doesn't work with Spark 2.0 ### What is this PR for? This PR fixes R interpreter doesn't work with Spark 2.0 ### What type of PR is it? Bug Fix ### Todos * [x] - Create and inject SparkSession into SparkRInterpreter ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1260 ### How should this be tested? Run Zeppelin with Spark 2.0 and run following codes and see if it returns `[1] 3` ``` %r localDF <- data.frame(name=c("a", "b", "c"), age=c(19, 23, 18)) df <- createDataFrame(spark, localDF) count(df) ``` ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <[email protected]> Closes #1259 from Leemoonsoo/ZEPPELIN-1260 and squashes the following commits: b3df11f [Lee moon soo] inject sqlContext as well 02822ac [Lee moon soo] Change indent 6bd1128 [Lee moon soo] Create and inject spark session into sparkr interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/52b3cbff Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/52b3cbff Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/52b3cbff Branch: refs/heads/master Commit: 52b3cbff76e9e1d684443e442673d4468f327108 Parents: c1935e1 Author: Lee moon soo <[email protected]> Authored: Tue Aug 2 07:59:36 2016 -0500 Committer: Mina Lee <[email protected]> Committed: Wed Aug 3 11:54:02 2016 +0900 ---------------------------------------------------------------------- .../java/org/apache/zeppelin/spark/SparkRInterpreter.java | 10 ++++++++-- .../main/java/org/apache/zeppelin/spark/ZeppelinR.java | 6 +++++- .../java/org/apache/zeppelin/spark/ZeppelinRContext.java | 8 ++++++++ spark/src/main/resources/R/zeppelin_sparkr.R | 5 +++++ .../apache/zeppelin/rest/ZeppelinSparkClusterTest.java | 7 +++++-- 5 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8329641..5598f09 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -21,6 +21,7 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.spark.SparkContext; import org.apache.spark.SparkRBackend; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -70,11 +71,16 @@ public class SparkRInterpreter extends Interpreter { int port = SparkRBackend.port(); SparkInterpreter sparkInterpreter = getSparkInterpreter(); - ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext()); + SparkContext sc = sparkInterpreter.getSparkContext(); + SparkVersion sparkVersion = new SparkVersion(sc.version()); + ZeppelinRContext.setSparkContext(sc); + if (Utils.isSpark2()) { + ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); + } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext()); - zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port); + zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion); try { zeppelinR.open(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 0ff0740..e0a47b7 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -36,6 +36,7 @@ import java.util.Map; public class ZeppelinR implements ExecuteResultHandler { Logger logger = LoggerFactory.getLogger(ZeppelinR.class); private final String rCmdPath; + private final SparkVersion sparkVersion; private DefaultExecutor executor; private SparkOutputStream outputStream; private PipedOutputStream input; @@ -107,9 +108,11 @@ public class ZeppelinR implements ExecuteResultHandler { * @param rCmdPath R repl commandline path * @param libPath sparkr library path */ - public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) { + public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, + SparkVersion sparkVersion) { this.rCmdPath = rCmdPath; this.libPath = libPath; + this.sparkVersion = sparkVersion; this.port = sparkRBackendPort; try { File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); @@ -137,6 +140,7 @@ public class ZeppelinR implements ExecuteResultHandler { cmd.addArgument(Integer.toString(hashCode())); cmd.addArgument(Integer.toString(port)); cmd.addArgument(libPath); + cmd.addArgument(Integer.toString(sparkVersion.toNumber())); executor = new DefaultExecutor(); outputStream = new SparkOutputStream(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index 82c320d..9ad156e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -27,6 +27,7 @@ public class ZeppelinRContext { private static SparkContext sparkContext; private static SQLContext sqlContext; private static ZeppelinContext zeppelinContext; + private static Object sparkSession; public static void setSparkContext(SparkContext sparkContext) { ZeppelinRContext.sparkContext = sparkContext; @@ -40,6 +41,10 @@ public class ZeppelinRContext { ZeppelinRContext.sqlContext = sqlContext; } + public static void setSparkSession(Object sparkSession) { + ZeppelinRContext.sparkSession = sparkSession; + } + public static SparkContext getSparkContext() { return sparkContext; } @@ -52,4 +57,7 @@ public class ZeppelinRContext { return zeppelinContext; } + public static Object getSparkSession() { + return sparkSession; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/spark/src/main/resources/R/zeppelin_sparkr.R ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R index fe2a16b..d951774 100644 --- a/spark/src/main/resources/R/zeppelin_sparkr.R +++ b/spark/src/main/resources/R/zeppelin_sparkr.R @@ -21,6 +21,7 @@ args <- commandArgs(trailingOnly = TRUE) hashCode <- as.integer(args[1]) port <- as.integer(args[2]) libPath <- args[3] +version <- as.integer(args[4]) rm(args) print(paste("Port ", toString(port))) @@ -41,6 +42,10 @@ assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) # setup spark env assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) +if (version >= 200) { + assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) + assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) +} assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/52b3cbff/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index a65ccbc..61dc6d1 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -100,13 +100,16 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } - // run markdown paragraph, again + String sqlContextName = "sqlContext"; + if (sparkVersion >= 20) { + sqlContextName = "spark"; + } Paragraph p = note.addParagraph(); Map config = p.getConfig(); config.put("enabled", true); p.setConfig(config); p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(sqlContext, localDF)\n" + + "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + "count(df)" ); note.run(p.getId());
