Repository: zeppelin
Updated Branches:
  refs/heads/master 70a3629d0 -> 321921862


[ZEPPELIN-2591] Show user info in spark job description

### What is this PR for?
Show user info in spark job description in spark UI.
With this info on spark UI, we will be able to find which user's job took most 
time, which users job is currently consuming the resources etc

### What type of PR is it?
improvement

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2591

### How should this be tested?
Run a spark job(scala, python, sql, R) , check the spark UI jobs page.
The job should display the username in its description.

### Screenshots (if appropriate)
<img width="1032" alt="screen shot 2017-05-27 at 15 41 30" 
src="https://cloud.githubusercontent.com/assets/5082742/26520572/00924702-42f3-11e7-8938-5a4b875d6c5d.png";>
Check the description column

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Karup <[email protected]>

Closes #2369 from karuppayya/ZEPPELIN-2591 and squashes the following commits:

569f660 [Karup] code cleanup
47cf5ed [Karup] Code clean up
2cc9c08 [Karup] Add description to jobs
22d850d [Karup] Populate user info


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/32192186
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/32192186
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/32192186

Branch: refs/heads/master
Commit: 3219218620e795769e6f65287f134b6a43e9c010
Parents: 70a3629
Author: Karup <[email protected]>
Authored: Sat May 27 15:44:53 2017 +0530
Committer: Lee moon soo <[email protected]>
Committed: Wed Jun 7 10:18:40 2017 -0700

----------------------------------------------------------------------
 .../org/apache/zeppelin/spark/PySparkInterpreter.java | 14 +++++++++++---
 .../org/apache/zeppelin/spark/SparkInterpreter.java   |  6 ++++--
 .../org/apache/zeppelin/spark/SparkRInterpreter.java  |  9 ++++++---
 .../apache/zeppelin/spark/SparkSqlInterpreter.java    |  3 ++-
 .../main/java/org/apache/zeppelin/spark/Utils.java    | 12 ++++++++++++
 spark/src/main/resources/python/zeppelin_pyspark.py   |  3 ++-
 6 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index b4e434f..28910b2 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -276,10 +276,13 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
   public class PythonInterpretRequest {
     public String statements;
     public String jobGroup;
+    public String jobDescription;
 
-    public PythonInterpretRequest(String statements, String jobGroup) {
+    public PythonInterpretRequest(String statements, String jobGroup,
+        String jobDescription) {
       this.statements = statements;
       this.jobGroup = jobGroup;
+      this.jobDescription = jobDescription;
     }
 
     public String statements() {
@@ -289,6 +292,10 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     public String jobGroup() {
       return jobGroup;
     }
+
+    public String jobDescription() {
+      return jobDescription;
+    }
   }
 
   Integer statementSetNotifier = new Integer(0);
@@ -395,10 +402,11 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
       return new InterpreterResult(Code.ERROR, errorMessage);
     }
     String jobGroup = Utils.buildJobGroupId(context);
+    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
     SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
     __zeppelin__.setInterpreterContext(context);
     __zeppelin__.setGui(context.getGui());
-    pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup);
+    pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc);
     statementOutput = null;
 
     synchronized (statementSetNotifier) {
@@ -476,7 +484,7 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
       return new LinkedList<>();
     }
 
-    pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
+    pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "", 
"");
     statementOutput = null;
 
     synchronized (statementSetNotifier) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index f757c21..490e33f 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkEnv;
-
 import org.apache.spark.SecurityManager;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.repl.SparkILoop;
@@ -58,6 +57,7 @@ import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.spark.dep.SparkDependencyContext;
 import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,6 +171,7 @@ public class SparkInterpreter extends Interpreter {
         String jobUrl = getJobUrl(jobId);
         String noteId = Utils.getNoteId(jobGroupId);
         String paragraphId = Utils.getParagraphId(jobGroupId);
+          
         if (jobUrl != null && noteId != null && paragraphId != null) {
           RemoteEventClientWrapper eventClient = 
BaseZeppelinContext.getEventClient();
           Map<String, String> infos = new java.util.HashMap<>();
@@ -1182,7 +1183,8 @@ public class SparkInterpreter extends Interpreter {
   public InterpreterResult interpret(String[] lines, InterpreterContext 
context) {
     synchronized (this) {
       z.setGui(context.getGui());
-      sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
+      String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+      sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
       InterpreterResult r = interpretInput(lines, context);
       sc.clearJobGroup();
       return r;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 606c8a0..ca52f79 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -21,6 +21,7 @@ import static 
org.apache.zeppelin.spark.ZeppelinRDisplay.render;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkRBackend;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -114,7 +115,9 @@ public class SparkRInterpreter extends Interpreter {
     }
 
     String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", 
false);
+    String jobDesc = "Started by: " +
+       Utils.getUserName(interpreterContext.getAuthenticationInfo());
+    sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
 
     String imageWidth = getProperty("zeppelin.R.image.width");
 
@@ -139,10 +142,10 @@ public class SparkRInterpreter extends Interpreter {
     // assign setJobGroup to dummy__, otherwise it would print NULL for this 
statement
     if (Utils.isSpark2()) {
       setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
-          "\", \"zeppelin sparkR job group description\", TRUE)";
+          "\", \" +" + jobDesc + "\", TRUE)";
     } else if 
(getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0))
 {
       setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
-          "\", \"zeppelin sparkR job group description\", TRUE)";
+          "\", \"" + jobDesc + "\", TRUE)";
     }
     logger.debug("set JobGroup:" + setJobGroup);
     lines = setJobGroup + "\n" + lines;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index d9e7563..134a65f 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -105,7 +105,8 @@ public class SparkSqlInterpreter extends Interpreter {
       sc.setLocalProperty("spark.scheduler.pool", null);
     }
 
-    sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
+    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
     Object rdd = null;
     try {
       // method signature of sqlc.sql() is changed

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/java/org/apache/zeppelin/spark/Utils.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java 
b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java
index 17edb0d..6448c97 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,4 +124,15 @@ class Utils {
     int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
     return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
   }
+
+  public static String getUserName(AuthenticationInfo info) {
+    String uName = "";
+    if (info != null) {
+      uName = info.getUser();
+    }
+    if (uName == null || uName.isEmpty()) {
+      uName = "anonymous";
+    }
+    return uName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/32192186/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py 
b/spark/src/main/resources/python/zeppelin_pyspark.py
index f927ec3..347b543 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -298,6 +298,7 @@ while True :
   try:
     stmts = req.statements().split("\n")
     jobGroup = req.jobGroup()
+    jobDesc = req.jobDescription()
     
     # Get post-execute hooks
     try:
@@ -318,7 +319,7 @@ while True :
     if stmts:
       # use exec mode to compile the statements except the last statement,
       # so that the last statement's evaluation will be printed to stdout
-      sc.setJobGroup(jobGroup, "Zeppelin")
+      sc.setJobGroup(jobGroup, jobDesc)
       code = compile('\n'.join(stmts), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
       to_run_hooks = []
       if (nhooks > 0):

Reply via email to