Repository: zeppelin Updated Branches: refs/heads/master 29b9b10f3 -> d9faef108
ZEPPELIN-3108. Support Spark 2.3 ### What is this PR for? Spark 2.3 remove `JobProgressListener` which cause zeppelin unable to run spark 2.3. This PR try to make Zeppelin support spark 2.3 via using `sc.statusTracker`, see `JobProgressUtil.scala` ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3108 ### How should this be tested? * Verified manually. ### Screenshots (if appropriate) ![screen shot 2018-01-30 at 9 45 01 pm](https://user-images.githubusercontent.com/164491/35569317-dce6f348-0606-11e8-9b18-74a847d64ac9.png) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2750 from zjffdu/ZEPPELIN-3108 and squashes the following commits: 43ae78a [Jeff Zhang] ZEPPELIN-3108. Support Spark 2.3 Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9faef10 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9faef10 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9faef10 Branch: refs/heads/master Commit: d9faef1085e4ade496ff7f3d7f8472a28678f8e7 Parents: 29b9b10 Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Nov 14 15:29:58 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Sun Feb 11 09:48:32 2018 +0800 ---------------------------------------------------------------------- spark/interpreter/pom.xml | 10 +- .../zeppelin/spark/OldSparkInterpreter.java | 153 +++++++++++++------ spark/pom.xml | 19 ++- spark/spark-dependencies/pom.xml | 10 +- .../spark/BaseSparkScalaInterpreter.scala | 16 +- .../apache/zeppelin/spark/JobProgressUtil.scala | 37 +++++ 6 files changed, 164 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 4496462..758f697 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -53,15 +53,7 @@ <pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude> <pyspark.test.include>**/*Test.*</pyspark.test.include> - - <spark.archive>spark-${spark.version}</spark.archive> - <spark.src.download.url> - http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz - </spark.src.download.url> - <spark.bin.download.url> - http://d3kbcqa49mib13.cloudfront.net/spark-${spark.version}-bin-without-hadoop.tgz - </spark.bin.download.url> - + </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index 6a54c3b..da332fe 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.spark.JobProgressUtil; import org.apache.spark.SecurityManager; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; @@ -44,10 +45,26 @@ import org.apache.spark.repl.SparkILoop; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerBlockUpdated; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.SparkUI; -import org.apache.spark.ui.jobs.JobProgressListener; +import org.apache.spark.scheduler.SparkListener; import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -113,7 +130,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { private static InterpreterHookRegistry hooks; private static SparkEnv env; private static Object sparkSession; // spark 2.x - private static JobProgressListener sparkListener; + private static SparkListener sparkListener; private static AbstractFile classOutputDir; private static Integer sharedInterpreterLock = new Integer(0); private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0); @@ -173,11 +190,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } - static JobProgressListener setupListeners(SparkContext context) { - JobProgressListener pl = new JobProgressListener(context.getConf()) { + static SparkListener setupListeners(SparkContext context) { + SparkListener pl = new SparkListener() { @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { - super.onJobStart(jobStart); int jobId = jobStart.jobId(); String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled"); @@ -207,6 +223,85 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return jobUrl; } + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { + + } + + @Override + public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { + + } + + @Override + public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { + + } + + @Override + public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { + + } + + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + + } + + @Override + public void onApplicationStart(SparkListenerApplicationStart applicationStart) { + + } + + @Override + public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { + + } + + @Override + public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { + + } + + @Override + public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { + + } + + @Override + public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { + + } + + @Override + public void onJobEnd(SparkListenerJobEnd jobEnd) { + + } + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + + } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + + } + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + + } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + + } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { + + } }; try { Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); @@ -224,7 +319,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { continue; } - if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) { + if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) { continue; } @@ -1274,48 +1369,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { @Override public int getProgress(InterpreterContext context) { String jobGroup = Utils.buildJobGroupId(context); - int completedTasks = 0; - int totalTasks = 0; - - DAGScheduler scheduler = sc.dagScheduler(); - if (scheduler == null) { - return 0; - } - HashSet<ActiveJob> jobs = scheduler.activeJobs(); - if (jobs == null || jobs.size() == 0) { - return 0; - } - Iterator<ActiveJob> it = jobs.iterator(); - while (it.hasNext()) { - ActiveJob job = it.next(); - String g = (String) job.properties().get("spark.jobGroup.id"); - if (jobGroup.equals(g)) { - int[] progressInfo = null; - try { - Object finalStage = job.getClass().getMethod("finalStage").invoke(job); - if (sparkVersion.getProgress1_0()) { - progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage); - } else { - progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); - } - } catch (IllegalAccessException | IllegalArgumentException - | InvocationTargetException | NoSuchMethodException - | SecurityException e) { - logger.error("Can't get progress info", e); - return 0; - } - totalTasks += progressInfo[0]; - completedTasks += progressInfo[1]; - } - } - - if (totalTasks == 0) { - return 0; - } - return completedTasks * 100 / totalTasks; + return JobProgressUtil.progress(sc, jobGroup); } - private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage) + private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object stage) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); @@ -1345,7 +1402,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return new int[] {numTasks, completedTasks}; } - private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage) + private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object stage) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); @@ -1421,7 +1478,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return FormType.NATIVE; } - public JobProgressListener getJobProgressListener() { + public SparkListener getJobProgressListener() { return sparkListener; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 06b7d9f..c55e453 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -47,6 +47,14 @@ <!-- spark versions --> <spark.version>2.2.0</spark.version> <py4j.version>0.10.4</py4j.version> + + <spark.archive>spark-${spark.version}</spark.archive> + <spark.src.download.url> + http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz + </spark.src.download.url> + <spark.bin.download.url> + http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz + </spark.bin.download.url> </properties> <dependencies> @@ -57,7 +65,6 @@ <version>${project.version}</version> </dependency> - <!--test libraries--> <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-display</artifactId> @@ -187,6 +194,16 @@ <profiles> + + <profile> + <id>spark-2.3</id> + <properties> + <spark.version>2.3.0</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <spark.py4j.version>0.10.6</spark.py4j.version> + </properties> + </profile> + <profile> <id>spark-2.2</id> <activation> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index 7643dc9..58977b4 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -28,7 +28,7 @@ </parent> <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-spark-dependencies_2.10</artifactId> + <artifactId>zeppelin-spark-dependencies</artifactId> <packaging>jar</packaging> <version>0.9.0-SNAPSHOT</version> <name>Zeppelin: Spark dependencies</name> @@ -54,14 +54,6 @@ <akka.group>org.spark-project.akka</akka.group> <akka.version>2.3.4-spark</akka.version> - <spark.archive>spark-${spark.version}</spark.archive> - <spark.src.download.url> - http://d3kbcqa49mib13.cloudfront.net/${spark.archive}.tgz - </spark.src.download.url> - <spark.bin.download.url> - http://d3kbcqa49mib13.cloudfront.net/${spark.archive}-bin-without-hadoop.tgz - </spark.bin.download.url> - <!--plugin versions--> <plugin.shade.version>2.3</plugin.shade.version> </properties> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 3ef4fe7..883beb0 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -21,7 +21,7 @@ package org.apache.zeppelin.spark import java.io.File import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext} import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult} @@ -93,19 +93,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { - val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup) - val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) } - val stages = jobs.flatMap { job => - job.stageIds().flatMap(sc.statusTracker.getStageInfo) - } - - val taskCount = stages.map(_.numTasks).sum - val completedTaskCount = stages.map(_.numCompletedTasks).sum - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt - } + JobProgressUtil.progress(sc, jobGroup) } protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9faef10/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala ---------------------------------------------------------------------- diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala new file mode 100644 index 0000000..517bed0 --- /dev/null +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +object JobProgressUtil { + + def progress(sc: SparkContext, jobGroup : String):Int = { + val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup) + val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) } + val stages = jobs.flatMap { job => + job.stageIds().flatMap(sc.statusTracker.getStageInfo) + } + + val taskCount = stages.map(_.numTasks).sum + val completedTaskCount = stages.map(_.numCompletedTasks).sum + if (taskCount == 0) { + 0 + } else { + (100 * completedTaskCount.toDouble / taskCount).toInt + } + } +}