Repository: incubator-zeppelin Updated Branches: refs/heads/master eb29c6daa -> ae1c06bb1
ZEPPELIN-298 Paragraph progress bar and % complete not working This PR fixes ZEPPELIN-298 by searching correct "addListener" method which have JobProgressListener as a parameter. Author: Lee moon soo <[email protected]> Closes #294 from Leemoonsoo/ZEPPELIN-298 and squashes the following commits: 6fb2397 [Lee moon soo] Fix job progress listener registration Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ae1c06bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ae1c06bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ae1c06bb Branch: refs/heads/master Commit: ae1c06bb10a49784aff0f807d803b874df504a79 Parents: eb29c6d Author: Lee moon soo <[email protected]> Authored: Thu Sep 10 14:35:27 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Thu Sep 10 21:43:54 2015 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/spark/SparkInterpreter.java | 36 ++++++++++++++++---- .../zeppelin/spark/SparkSqlInterpreter.java | 2 ++ .../zeppelin/spark/SparkInterpreterTest.java | 12 +++++-- 3 files changed, 41 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index dfb846c..a2e31fb 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -157,15 +157,41 @@ public class SparkInterpreter extends Interpreter { return sc != null; } - private static JobProgressListener setupListeners(SparkContext context) { + static JobProgressListener setupListeners(SparkContext context) { JobProgressListener pl = new JobProgressListener(context.getConf()); try { Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); - Method m = listenerBus.getClass().getMethod("addListener", SparkListener.class); - m.invoke(listenerBus, pl); + + Method[] methods = listenerBus.getClass().getMethods(); + Method addListenerMethod = null; + for (Method m : methods) { + if (!m.getName().equals("addListener")) { + continue; + } + + Class<?>[] parameterTypes = m.getParameterTypes(); + + if (parameterTypes.length != 1) { + continue; + } + + if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) { + continue; + } + + addListenerMethod = m; + break; + } + + if (addListenerMethod != null) { + addListenerMethod.invoke(listenerBus, pl); + } else { + return null; + } } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); + return null; } return pl; } @@ -676,12 +702,11 @@ public class SparkInterpreter extends Interpreter { while (it.hasNext()) { ActiveJob job = it.next(); String g = (String) job.properties().get("spark.jobGroup.id"); - if (jobGroup.equals(g)) { int[] progressInfo = null; try { Object finalStage = job.getClass().getMethod("finalStage").invoke(job); - if (sparkVersion.getProgress1_0()) { + if (sparkVersion.getProgress1_0()) { progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage); } else { progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); @@ -748,7 +773,6 @@ public class SparkInterpreter extends Interpreter { this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData"); Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks"); - Set<Tuple2<Object, Object>> keys = JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava(); for (Tuple2<Object, Object> k : keys) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 7b26e34..1ee5f9c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -125,6 +125,7 @@ public class SparkSqlInterpreter extends Interpreter { sc.setLocalProperty("spark.scheduler.pool", null); } + sc.setJobGroup(getJobGroup(context), "Zeppelin", false); Object rdd = null; try { // method signature of sqlc.sql() is changed @@ -139,6 +140,7 @@ public class SparkSqlInterpreter extends Interpreter { } String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); + sc.clearJobGroup(); return new InterpreterResult(Code.SUCCESS, msg); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ae1c06bb/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 87177f1..be65c09 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -17,8 +17,7 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.File; import java.util.HashMap; @@ -26,6 +25,7 @@ import java.util.LinkedList; import java.util.Properties; import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -119,7 +119,7 @@ public class SparkInterpreterTest { @Test public void testNextLineInvocation() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); } @Test @@ -128,6 +128,12 @@ public class SparkInterpreterTest { } @Test + public void testListener() { + SparkContext sc = repl.getSparkContext(); + assertNotNull(SparkInterpreter.setupListeners(sc)); + } + + @Test public void testSparkSql(){ repl.interpret("case class Person(name:String, age:Int)\n", context); repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
