Repository: zeppelin Updated Branches: refs/heads/master 70a3629d0 -> 321921862
[ZEPPELIN-2591] Show user info in spark job description ### What is this PR for? Show user info in spark job description in spark UI. With this info on spark UI, we will be able to find which user's job took most time, which users job is currently consuming the resources etc ### What type of PR is it? improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2591 ### How should this be tested? Run a spark job(scala, python, sql, R) , check the spark UI jobs page. The job should display the username in its description. ### Screenshots (if appropriate) <img width="1032" alt="screen shot 2017-05-27 at 15 41 30" src="https://cloud.githubusercontent.com/assets/5082742/26520572/00924702-42f3-11e7-8938-5a4b875d6c5d.png"> Check the description column ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Karup <[email protected]> Closes #2369 from karuppayya/ZEPPELIN-2591 and squashes the following commits: 569f660 [Karup] code cleanup 47cf5ed [Karup] Code clean up 2cc9c08 [Karup] Add description to jobs 22d850d [Karup] Populate user info Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/32192186 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/32192186 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/32192186 Branch: refs/heads/master Commit: 3219218620e795769e6f65287f134b6a43e9c010 Parents: 70a3629 Author: Karup <[email protected]> Authored: Sat May 27 15:44:53 2017 +0530 Committer: Lee moon soo <[email protected]> Committed: Wed Jun 7 10:18:40 2017 -0700 ---------------------------------------------------------------------- .../org/apache/zeppelin/spark/PySparkInterpreter.java | 14 +++++++++++--- .../org/apache/zeppelin/spark/SparkInterpreter.java | 6 ++++-- .../org/apache/zeppelin/spark/SparkRInterpreter.java | 9 ++++++--- .../apache/zeppelin/spark/SparkSqlInterpreter.java | 3 ++- .../main/java/org/apache/zeppelin/spark/Utils.java | 12 ++++++++++++ spark/src/main/resources/python/zeppelin_pyspark.py | 3 ++- 6 files changed, 37 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index b4e434f..28910b2 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -276,10 +276,13 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public class PythonInterpretRequest { public String statements; public String jobGroup; + public String jobDescription; - public PythonInterpretRequest(String statements, String jobGroup) { + public PythonInterpretRequest(String statements, String jobGroup, + String jobDescription) { this.statements = statements; this.jobGroup = jobGroup; + this.jobDescription = jobDescription; } public String statements() { @@ -289,6 +292,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public String jobGroup() { return jobGroup; } + + public String jobDescription() { + return jobDescription; + } } Integer statementSetNotifier = new Integer(0); @@ -395,10 +402,11 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new InterpreterResult(Code.ERROR, errorMessage); } String jobGroup = Utils.buildJobGroupId(context); + String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext(); __zeppelin__.setInterpreterContext(context); __zeppelin__.setGui(context.getGui()); - pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup); + pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc); statementOutput = null; synchronized (statementSetNotifier) { @@ -476,7 +484,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new LinkedList<>(); } - pythonInterpretRequest = new PythonInterpretRequest(completionCommand, ""); + pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "", ""); statementOutput = null; synchronized (statementSetNotifier) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index f757c21..490e33f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -36,7 +36,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; - import org.apache.spark.SecurityManager; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.repl.SparkILoop; @@ -58,6 +57,7 @@ import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; +import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +171,7 @@ public class SparkInterpreter extends Interpreter { String jobUrl = getJobUrl(jobId); String noteId = Utils.getNoteId(jobGroupId); String paragraphId = Utils.getParagraphId(jobGroupId); + if (jobUrl != null && noteId != null && paragraphId != null) { RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); Map<String, String> infos = new java.util.HashMap<>(); @@ -1182,7 +1183,8 @@ public class SparkInterpreter extends Interpreter { public InterpreterResult interpret(String[] lines, InterpreterContext context) { synchronized (this) { z.setGui(context.getGui()); - sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); + String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false); InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 606c8a0..ca52f79 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -21,6 +21,7 @@ import static org.apache.zeppelin.spark.ZeppelinRDisplay.render; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.spark.SparkContext; import org.apache.spark.SparkRBackend; import org.apache.spark.api.java.JavaSparkContext; @@ -114,7 +115,9 @@ public class SparkRInterpreter extends Interpreter { } String jobGroup = Utils.buildJobGroupId(interpreterContext); - sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", false); + String jobDesc = "Started by: " + + Utils.getUserName(interpreterContext.getAuthenticationInfo()); + sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); String imageWidth = getProperty("zeppelin.R.image.width"); @@ -139,10 +142,10 @@ public class SparkRInterpreter extends Interpreter { // assign setJobGroup to dummy__, otherwise it would print NULL for this statement if (Utils.isSpark2()) { setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + - "\", \"zeppelin sparkR job group description\", TRUE)"; + "\", \" +" + jobDesc + "\", TRUE)"; } else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0)) { setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + - "\", \"zeppelin sparkR job group description\", TRUE)"; + "\", \"" + jobDesc + "\", TRUE)"; } logger.debug("set JobGroup:" + setJobGroup); lines = setJobGroup + "\n" + lines; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index d9e7563..134a65f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -105,7 +105,8 @@ public class SparkSqlInterpreter extends Interpreter { sc.setLocalProperty("spark.scheduler.pool", null); } - sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); + String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false); Object rdd = null; try { // method signature of sqlc.sql() is changed http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java index 17edb0d..6448c97 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.spark; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,4 +124,15 @@ class Utils { int secondIndex = jobgroupId.indexOf("-", indexOf + 1); return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); } + + public static String getUserName(AuthenticationInfo info) { + String uName = ""; + if (info != null) { + uName = info.getUser(); + } + if (uName == null || uName.isEmpty()) { + uName = "anonymous"; + } + return uName; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index f927ec3..347b543 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -298,6 +298,7 @@ while True : try: stmts = req.statements().split("\n") jobGroup = req.jobGroup() + jobDesc = req.jobDescription() # Get post-execute hooks try: @@ -318,7 +319,7 @@ while True : if stmts: # use exec mode to compile the statements except the last statement, # so that the last statement's evaluation will be printed to stdout - sc.setJobGroup(jobGroup, "Zeppelin") + sc.setJobGroup(jobGroup, jobDesc) code = compile('\n'.join(stmts), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) to_run_hooks = [] if (nhooks > 0):
