Repository: zeppelin Updated Branches: refs/heads/master 307049a9e -> 439b76c2c
ZEPPELIN-1425. sparkr.zip is not distributed to executors ### What is this PR for? sparkr.zip is not distrubuted to executor, so any sparkR job that requrie R daemon in executor will fail. This PR would add sparkr.zip into `spark.yarn.dist.archives`. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1425 ### How should this be tested? Run the following code ``` %spark.r df <- createDataFrame(sqlContext, mtcars) showDF(df) ``` ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #1423 from zjffdu/ZEPPELIN-1425 and squashes the following commits: 145a8dc [Jeff Zhang] ZEPPELIN-1425. sparkr.zip is not distributed to executors Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/439b76c2 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/439b76c2 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/439b76c2 Branch: refs/heads/master Commit: 439b76c2c9ce363ab5993e49da22d04437f6dc76 Parents: 307049a Author: Jeff Zhang <[email protected]> Authored: Mon Sep 12 09:51:06 2016 +0800 Committer: Lee moon soo <[email protected]> Committed: Wed Sep 14 18:29:09 2016 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/spark/SparkInterpreter.java | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/439b76c2/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 9a54912..02d766f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -329,6 +329,7 @@ public class SparkInterpreter extends Interpreter { } setupConfForPySpark(conf); + setupConfForSparkR(conf); Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession"); Object builder = Utils.invokeStaticMethod(SparkSession, "builder"); Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf }); @@ -443,6 +444,7 @@ public class SparkInterpreter extends Interpreter { } } setupConfForPySpark(conf); + setupConfForSparkR(conf); SparkContext sparkContext = new SparkContext(conf); return sparkContext; } @@ -494,6 +496,35 @@ public class SparkInterpreter extends Interpreter { } } + private void setupConfForSparkR(SparkConf conf) { + String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue(); + File sparkRPath; + if (null == sparkRBasePath) { + sparkRBasePath = + new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue(); + sparkRPath = new File(sparkRBasePath, + "interpreter" + File.separator + "spark" + File.separator + "R"); + } else { + sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib"); + } + + sparkRPath = new File(sparkRPath, "sparkr.zip"); + if (sparkRPath.exists() && sparkRPath.isFile()) { + String archives = null; + if (conf.contains("spark.yarn.dist.archives")) { + archives = conf.get("spark.yarn.dist.archives"); + } + if (archives != null) { + archives = archives + "," + sparkRPath + "#sparkr"; + } else { + archives = sparkRPath + "#sparkr"; + } + conf.set("spark.yarn.dist.archives", archives); + } else { + logger.warn("sparkr.zip is not found, sparkr may not work."); + } + } + static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; }
