Repository: zeppelin
Updated Branches:
  refs/heads/master 29b9b10f3 -> d9faef108


ZEPPELIN-3108. Support Spark 2.3

### What is this PR for?
Spark 2.3 remove `JobProgressListener` which cause zeppelin unable to run spark 
2.3.
This PR try to make Zeppelin support spark 2.3 via using `sc.statusTracker`, 
see `JobProgressUtil.scala`

### What type of PR is it?
[Improvement ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3108

### How should this be tested?
* Verified manually.

### Screenshots (if appropriate)
![screen shot 2018-01-30 at 9 45 01 
pm](https://user-images.githubusercontent.com/164491/35569317-dce6f348-0606-11e8-9b18-74a847d64ac9.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2750 from zjffdu/ZEPPELIN-3108 and squashes the following commits:

43ae78a [Jeff Zhang] ZEPPELIN-3108. Support Spark 2.3


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9faef10
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9faef10
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9faef10

Branch: refs/heads/master
Commit: d9faef1085e4ade496ff7f3d7f8472a28678f8e7
Parents: 29b9b10
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Nov 14 15:29:58 2017 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Sun Feb 11 09:48:32 2018 +0800

----------------------------------------------------------------------
 spark/interpreter/pom.xml                       |  10 +-
 .../zeppelin/spark/OldSparkInterpreter.java     | 153 +++++++++++++------
 spark/pom.xml                                   |  19 ++-
 spark/spark-dependencies/pom.xml                |  10 +-
 .../spark/BaseSparkScalaInterpreter.scala       |  16 +-
 .../apache/zeppelin/spark/JobProgressUtil.scala |  37 +++++
 6 files changed, 164 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 4496462..758f697 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -53,15 +53,7 @@
     
<pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude>
     <pyspark.test.include>**/*Test.*</pyspark.test.include>
 
-
-    <spark.archive>spark-${spark.version}</spark.archive>
-    <spark.src.download.url>
-      http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
-    </spark.src.download.url>
-    <spark.bin.download.url>
-      
http://d3kbcqa49mib13.cloudfront.net/spark-${spark.version}-bin-without-hadoop.tgz
-    </spark.bin.download.url>
-
+    
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index 6a54c3b..da332fe 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.spark.JobProgressUtil;
 import org.apache.spark.SecurityManager;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
@@ -44,10 +45,26 @@ import org.apache.spark.repl.SparkILoop;
 import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
 import org.apache.spark.scheduler.Pool;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerBlockUpdated;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
 import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.ui.SparkUI;
-import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.spark.scheduler.SparkListener;
 import org.apache.zeppelin.interpreter.BaseZeppelinContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -113,7 +130,7 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
   private static InterpreterHookRegistry hooks;
   private static SparkEnv env;
   private static Object sparkSession;    // spark 2.x
-  private static JobProgressListener sparkListener;
+  private static SparkListener sparkListener;
   private static AbstractFile classOutputDir;
   private static Integer sharedInterpreterLock = new Integer(0);
   private static AtomicInteger numReferenceOfSparkContext = new 
AtomicInteger(0);
@@ -173,11 +190,10 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     }
   }
 
-  static JobProgressListener setupListeners(SparkContext context) {
-    JobProgressListener pl = new JobProgressListener(context.getConf()) {
+  static SparkListener setupListeners(SparkContext context) {
+    SparkListener pl = new SparkListener() {
       @Override
       public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-        super.onJobStart(jobStart);
         int jobId = jobStart.jobId();
         String jobGroupId = 
jobStart.properties().getProperty("spark.jobGroup.id");
         String uiEnabled = 
jobStart.properties().getProperty("spark.ui.enabled");
@@ -207,6 +223,85 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
         return jobUrl;
       }
 
+      @Override
+      public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
+
+      }
+
+      @Override
+      public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemoved) {
+
+      }
+
+      @Override
+      public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+
+      }
+
+      @Override
+      public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
+
+      }
+
+      @Override
+      public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) 
{
+
+      }
+
+      @Override
+      public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
+
+      }
+
+      @Override
+      public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+      }
+
+      @Override
+      public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
+
+      }
+
+      @Override
+      public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
+
+      }
+
+      @Override
+      public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
+
+      }
+
+      @Override
+      public void onJobEnd(SparkListenerJobEnd jobEnd) {
+
+      }
+      
+      @Override
+      public void onStageCompleted(SparkListenerStageCompleted stageCompleted) 
{
+
+      }
+
+      @Override
+      public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) 
{
+
+      }
+
+      @Override
+      public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+
+      }
+
+      @Override
+      public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
+
+      }
+
+      @Override
+      public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+      }
     };
     try {
       Object listenerBus = 
context.getClass().getMethod("listenerBus").invoke(context);
@@ -224,7 +319,7 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
           continue;
         }
 
-        if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
+        if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) {
           continue;
         }
 
@@ -1274,48 +1369,10 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
   @Override
   public int getProgress(InterpreterContext context) {
     String jobGroup = Utils.buildJobGroupId(context);
-    int completedTasks = 0;
-    int totalTasks = 0;
-
-    DAGScheduler scheduler = sc.dagScheduler();
-    if (scheduler == null) {
-      return 0;
-    }
-    HashSet<ActiveJob> jobs = scheduler.activeJobs();
-    if (jobs == null || jobs.size() == 0) {
-      return 0;
-    }
-    Iterator<ActiveJob> it = jobs.iterator();
-    while (it.hasNext()) {
-      ActiveJob job = it.next();
-      String g = (String) job.properties().get("spark.jobGroup.id");
-      if (jobGroup.equals(g)) {
-        int[] progressInfo = null;
-        try {
-          Object finalStage = 
job.getClass().getMethod("finalStage").invoke(job);
-          if (sparkVersion.getProgress1_0()) {
-            progressInfo = getProgressFromStage_1_0x(sparkListener, 
finalStage);
-          } else {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, 
finalStage);
-          }
-        } catch (IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException | NoSuchMethodException
-            | SecurityException e) {
-          logger.error("Can't get progress info", e);
-          return 0;
-        }
-        totalTasks += progressInfo[0];
-        completedTasks += progressInfo[1];
-      }
-    }
-
-    if (totalTasks == 0) {
-      return 0;
-    }
-    return completedTasks * 100 / totalTasks;
+    return JobProgressUtil.progress(sc, jobGroup);
   }
 
-  private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, 
Object stage)
+  private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object 
stage)
       throws IllegalAccessException, IllegalArgumentException,
       InvocationTargetException, NoSuchMethodException, SecurityException {
     int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
@@ -1345,7 +1402,7 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     return new int[] {numTasks, completedTasks};
   }
 
-  private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, 
Object stage)
+  private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object 
stage)
       throws IllegalAccessException, IllegalArgumentException,
       InvocationTargetException, NoSuchMethodException, SecurityException {
     int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
@@ -1421,7 +1478,7 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     return FormType.NATIVE;
   }
 
-  public JobProgressListener getJobProgressListener() {
+  public SparkListener getJobProgressListener() {
     return sparkListener;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 06b7d9f..c55e453 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -47,6 +47,14 @@
         <!-- spark versions -->
         <spark.version>2.2.0</spark.version>
         <py4j.version>0.10.4</py4j.version>
+
+        <spark.archive>spark-${spark.version}</spark.archive>
+        <spark.src.download.url>
+            http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
+        </spark.src.download.url>
+        <spark.bin.download.url>
+            
http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz
+        </spark.bin.download.url>
     </properties>
 
     <dependencies>
@@ -57,7 +65,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <!--test libraries-->
         <dependency>
             <groupId>org.apache.zeppelin</groupId>
             <artifactId>zeppelin-display</artifactId>
@@ -187,6 +194,16 @@
 
 
     <profiles>
+
+        <profile>
+            <id>spark-2.3</id>
+            <properties>
+                <spark.version>2.3.0</spark.version>
+                <protobuf.version>2.5.0</protobuf.version>
+                <spark.py4j.version>0.10.6</spark.py4j.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>spark-2.2</id>
             <activation>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml
index 7643dc9..58977b4 100644
--- a/spark/spark-dependencies/pom.xml
+++ b/spark/spark-dependencies/pom.xml
@@ -28,7 +28,7 @@
   </parent>
 
   <groupId>org.apache.zeppelin</groupId>
-  <artifactId>zeppelin-spark-dependencies_2.10</artifactId>
+  <artifactId>zeppelin-spark-dependencies</artifactId>
   <packaging>jar</packaging>
   <version>0.9.0-SNAPSHOT</version>
   <name>Zeppelin: Spark dependencies</name>
@@ -54,14 +54,6 @@
     <akka.group>org.spark-project.akka</akka.group>
     <akka.version>2.3.4-spark</akka.version>
 
-    <spark.archive>spark-${spark.version}</spark.archive>
-    <spark.src.download.url>
-      http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz
-    </spark.src.download.url>
-    <spark.bin.download.url>
-      
http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz
-    </spark.bin.download.url>
-
     <!--plugin versions-->
     <plugin.shade.version>2.3</plugin.shade.version>
   </properties>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 3ef4fe7..883beb0 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -21,7 +21,7 @@ package org.apache.zeppelin.spark
 import java.io.File
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext}
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
 import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
@@ -93,19 +93,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
   }
 
   protected def getProgress(jobGroup: String, context: InterpreterContext): 
Int = {
-    val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
-    val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
-    val stages = jobs.flatMap { job =>
-      job.stageIds().flatMap(sc.statusTracker.getStageInfo)
-    }
-
-    val taskCount = stages.map(_.numTasks).sum
-    val completedTaskCount = stages.map(_.numCompletedTasks).sum
-    if (taskCount == 0) {
-      0
-    } else {
-      (100 * completedTaskCount.toDouble / taskCount).toInt
-    }
+    JobProgressUtil.progress(sc, jobGroup)
   }
 
   protected def bind(name: String, tpe: String, value: Object, modifier: 
List[String]): Unit

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
new file mode 100644
index 0000000..517bed0
--- /dev/null
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+object JobProgressUtil {
+
+  def progress(sc: SparkContext, jobGroup : String):Int = {
+    val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+    val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+    val stages = jobs.flatMap { job =>
+      job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+    }
+
+    val taskCount = stages.map(_.numTasks).sum
+    val completedTaskCount = stages.map(_.numCompletedTasks).sum
+    if (taskCount == 0) {
+      0
+    } else {
+      (100 * completedTaskCount.toDouble / taskCount).toInt
+    }
+  }
+}

Reply via email to