http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
deleted file mode 100644
index 89c6e45..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkInterpreter.java
+++ /dev/null
@@ -1,718 +0,0 @@
-package com.nflabs.zeppelin.spark;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import com.nflabs.zeppelin.interpreter.*;
-import org.apache.spark.HttpServer;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.repl.SparkCommandLine;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.spark.repl.SparkIMain;
-import org.apache.spark.repl.SparkJLineCompletion;
-import org.apache.spark.scheduler.ActiveJob;
-import org.apache.spark.scheduler.DAGScheduler;
-import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.Stage;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Console;
-import scala.Enumeration.Value;
-import scala.None;
-import scala.Some;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-import scala.collection.mutable.HashMap;
-import scala.collection.mutable.HashSet;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-import com.nflabs.zeppelin.scheduler.SchedulerFactory;
-import com.nflabs.zeppelin.spark.dep.DependencyContext;
-import com.nflabs.zeppelin.spark.dep.DependencyResolver;
-
-/**
- * Spark interpreter for Zeppelin.
- *
- */
-public class SparkInterpreter extends Interpreter {
-  Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
-
-  static {
-    Interpreter.register(
-        "spark",
-        "spark",
-        SparkInterpreter.class.getName(),
-        new InterpreterPropertyBuilder()
-            .add("spark.app.name", "Zeppelin", "The name of spark 
application.")
-            .add("master",
-                getSystemDefault("MASTER", "spark.master", "local[*]"),
-                "Spark master uri. ex) spark://masterhost:7077")
-            .add("spark.executor.memory",
-                getSystemDefault(null, "spark.executor.memory", "512m"),
-                "Executor memory per worker instance. ex) 512m, 32g")
-            .add("spark.cores.max",
-                getSystemDefault(null, "spark.cores.max", ""),
-                "Total number of cores to use. Empty value uses all available 
core.")
-            .add("spark.yarn.jar",
-                getSystemDefault("SPARK_YARN_JAR", "spark.yarn.jar", ""),
-                "The location of the Spark jar file. If you use yarn as a 
cluster, "
-                + "we should set this value")
-            .add("zeppelin.spark.useHiveContext", "true",
-                 "Use HiveContext instead of SQLContext if it is true.")
-            .add("args", "", "spark commandline args").build());
-
-  }
-
-  private ZeppelinContext z;
-  private SparkILoop interpreter;
-  private SparkIMain intp;
-  private SparkContext sc;
-  private ByteArrayOutputStream out;
-  private SQLContext sqlc;
-  private DependencyResolver dep;
-  private SparkJLineCompletion completor;
-
-  private JobProgressListener sparkListener;
-
-  private Map<String, Object> binder;
-  private SparkEnv env;
-
-
-  public SparkInterpreter(Properties property) {
-    super(property);
-    out = new ByteArrayOutputStream();
-  }
-
-  public SparkInterpreter(Properties property, SparkContext sc) {
-    this(property);
-
-    this.sc = sc;
-    env = SparkEnv.get();
-    sparkListener = setupListeners(this.sc);
-  }
-
-  public synchronized SparkContext getSparkContext() {
-    if (sc == null) {
-      sc = createSparkContext();
-      env = SparkEnv.get();
-      sparkListener = setupListeners(sc);
-    }
-    return sc;
-  }
-
-  public boolean isSparkContextInitialized() {
-    return sc != null;
-  }
-
-  private static JobProgressListener setupListeners(SparkContext context) {
-    JobProgressListener pl = new JobProgressListener(context.getConf());
-    context.listenerBus().addListener(pl);
-    return pl;
-  }
-
-  private boolean useHiveContext() {
-    return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
-  }
-
-  public SQLContext getSQLContext() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
-          sqlc = new SQLContext(getSparkContext());
-        }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
-      }
-    }
-
-    return sqlc;
-  }
-
-  public DependencyResolver getDependencyResolver() {
-    if (dep == null) {
-      dep = new DependencyResolver(intp, sc, 
getProperty("zeppelin.dep.localrepo"));
-    }
-    return dep;
-  }
-
-  private DepInterpreter getDepInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) return null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup) {
-        if (intp.getClassName().equals(DepInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (DepInterpreter) p;
-        }
-      }
-    }
-    return null;
-  }
-
-  public SparkContext createSparkContext() {
-    System.err.println("------ Create new SparkContext " + 
getProperty("master") + " -------");
-
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    String[] jars = SparkILoop.getAddedJars();
-
-    String classServerUri = null;
-
-    try { // in case of spark 1.1x, spark 1.2x
-      Method classServer = 
interpreter.intp().getClass().getMethod("classServer");
-      HttpServer httpServer = (HttpServer) 
classServer.invoke(interpreter.intp());
-      classServerUri = httpServer.uri();
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      // continue
-    }
-
-    if (classServerUri == null) {
-      try { // for spark 1.3x
-        Method classServer = 
interpreter.intp().getClass().getMethod("classServerUri");
-        classServerUri = (String) classServer.invoke(interpreter.intp());
-      } catch (NoSuchMethodException | SecurityException | 
IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        throw new InterpreterException(e);
-      }
-    }
-
-    SparkConf conf =
-        new SparkConf()
-            .setMaster(getProperty("master"))
-            .setAppName(getProperty("spark.app.name"))
-            .setJars(jars)
-            .set("spark.repl.class.uri", classServerUri);
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    if (System.getenv("SPARK_HOME") != null) {
-      conf.setSparkHome(System.getenv("SPARK_HOME"));
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperty();
-
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      Object value = intpProperty.get(key);
-      if (!isEmptyString(value)) {
-        logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, 
value));
-        conf.set(key, (String) value);
-      }
-    }
-
-    SparkContext sparkContext = new SparkContext(conf);
-    return sparkContext;
-  }
-
-  public static boolean isEmptyString(Object val) {
-    return val instanceof String && ((String) val).trim().isEmpty();
-  }
-
-  public static String getSystemDefault(
-      String envName,
-      String propertyName,
-      String defaultValue) {
-
-    if (envName != null && !envName.isEmpty()) {
-      String envValue = System.getenv().get(envName);
-      if (envValue != null) {
-        return envValue;
-      }
-    }
-
-    if (propertyName != null && !propertyName.isEmpty()) {
-      String propValue = System.getProperty(propertyName);
-      if (propValue != null) {
-        return propValue;
-      }
-    }
-    return defaultValue;
-  }
-
-  @Override
-  public void open() {
-    URL[] urls = getClassloaderUrls();
-
-    // Very nice discussion about how scala compiler handle classpath
-    // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
-    /*
-     * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > 
val p = new
-     * Interpreter(env) > p.setContextClassLoader > Alternatively you can set 
the class path through
-     * nsc.Settings.classpath.
-     *
-     * >> val settings = new Settings() >> settings.usejavacp.value = true >>
-     * settings.classpath.value += File.pathSeparator + >> 
System.getProperty("java.class.path") >>
-     * val in = new Interpreter(settings) { >> override protected def 
parentClassLoader =
-     * getClass.getClassLoader >> } >> in.setContextClassLoader()
-     */
-    Settings settings = new Settings();
-    if (getProperty("args") != null) {
-      String[] argsArray = getProperty("args").split(" ");
-      LinkedList<String> argList = new LinkedList<String>();
-      for (String arg : argsArray) {
-        argList.add(arg);
-      }
-
-      SparkCommandLine command =
-          new SparkCommandLine(scala.collection.JavaConversions.asScalaBuffer(
-              argList).toList());
-      settings = command.settings();
-    }
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    // add dependency from DepInterpreter
-    DepInterpreter depInterpreter = getDepInterpreter();
-    if (depInterpreter != null) {
-      DependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (classpath.length() > 0) {
-              classpath += File.pathSeparator;
-            }
-            classpath += f.getAbsolutePath();
-          }
-        }
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new 
Some<ClassLoader>(Thread.currentThread()
-        .getContextClassLoader()));
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    PrintStream printStream = new PrintStream(out);
-
-    /* spark interpreter */
-    this.interpreter = new SparkILoop(null, new PrintWriter(out));
-    interpreter.settings_$eq(settings);
-
-    interpreter.createInterpreter();
-
-    intp = interpreter.intp();
-    intp.setContextClassLoader();
-    intp.initializeSynchronous();
-
-    completor = new SparkJLineCompletion(intp);
-
-    sc = getSparkContext();
-    if (sc.getPoolForName("fair").isEmpty()) {
-      Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
-      int minimumShare = 0;
-      int weight = 1;
-      Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
-      sc.taskScheduler().rootPool().addSchedulable(pool);
-    }
-
-    sqlc = getSQLContext();
-
-    dep = getDependencyResolver();
-
-    z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
-
-    try {
-      if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
-        Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", 
Settings.class);
-        loadFiles.invoke(this.interpreter, settings);
-      } else if (sc.version().startsWith("1.3")) {
-        Method loadFiles = this.interpreter.getClass().getMethod(
-            "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
-        loadFiles.invoke(this.interpreter, settings);
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-
-    intp.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
-    binder = (Map<String, Object>) getValue("_binder");
-    binder.put("sc", sc);
-    binder.put("sqlc", sqlc);
-    binder.put("z", z);
-    binder.put("out", printStream);
-
-    intp.interpret("@transient val z = "
-                 + 
"_binder.get(\"z\").asInstanceOf[com.nflabs.zeppelin.spark.ZeppelinContext]");
-    intp.interpret("@transient val sc = "
-                 + 
"_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
-    intp.interpret("@transient val sqlc = "
-                 + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-    intp.interpret("@transient val sqlContext = "
-                 + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-    intp.interpret("import org.apache.spark.SparkContext._");
-
-    if (sc.version().startsWith("1.1")) {
-      intp.interpret("import sqlContext._");
-    } else if (sc.version().startsWith("1.2")) {
-      intp.interpret("import sqlContext._");
-    } else if (sc.version().startsWith("1.3")) {
-      intp.interpret("import sqlContext.implicits._");
-      intp.interpret("import sqlContext.sql");
-      intp.interpret("import org.apache.spark.sql.functions._");
-    }
-
-    // add jar
-    if (depInterpreter != null) {
-      DependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFilesDist();
-        if (files != null) {
-          for (File f : files) {
-            if (f.getName().toLowerCase().endsWith(".jar")) {
-              sc.addJar(f.getAbsolutePath());
-              logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
-            } else {
-              sc.addFile(f.getAbsolutePath());
-              logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = 
classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = 
System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<File>();
-    if (cl == null) {
-      return paths;
-    }
-
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
-
-  @Override
-  public List<String> completion(String buf, int cursor) {
-    ScalaCompleter c = completor.completer();
-    Candidates ret = c.complete(buf, cursor);
-    return scala.collection.JavaConversions.asJavaList(ret.candidates());
-  }
-
-  public Object getValue(String name) {
-    Object ret = intp.valueOfTerm(name);
-    if (ret instanceof None) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
-    } else {
-      return ret;
-    }
-  }
-
-  String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
-  }
-
-  /**
-   * Interpret a single line.
-   */
-  @Override
-  public InterpreterResult interpret(String line, InterpreterContext context) {
-    z.setInterpreterContext(context);
-    if (line == null || line.trim().length() == 0) {
-      return new InterpreterResult(Code.SUCCESS);
-    }
-    return interpret(line.split("\n"), context);
-  }
-
-  public InterpreterResult interpret(String[] lines, InterpreterContext 
context) {
-    synchronized (this) {
-      z.setGui(context.getGui());
-      sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-      InterpreterResult r = interpretInput(lines);
-      sc.clearJobGroup();
-      return r;
-    }
-  }
-
-  public InterpreterResult interpretInput(String[] lines) {
-    SparkEnv.set(env);
-
-    // add print("") to make sure not finishing with comment
-    // see https://github.com/NFLabs/zeppelin/issues/151
-    String[] linesToRun = new String[lines.length + 1];
-    for (int i = 0; i < lines.length; i++) {
-      linesToRun[i] = lines[i];
-    }
-    linesToRun[lines.length] = "print(\"\")";
-
-    Console.setOut((java.io.PrintStream) binder.get("out"));
-    out.reset();
-    Code r = null;
-    String incomplete = "";
-    for (String s : linesToRun) {
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      try {
-        res = intp.interpret(incomplete + s);
-      } catch (Exception e) {
-        sc.clearJobGroup();
-        logger.info("Interpreter exception", e);
-        return new InterpreterResult(Code.ERROR, 
InterpreterUtils.getMostRelevantMessage(e));
-      }
-
-      r = getResultCode(res);
-
-      if (r == Code.ERROR) {
-        sc.clearJobGroup();
-        return new InterpreterResult(r, out.toString());
-      } else if (r == Code.INCOMPLETE) {
-        incomplete += s + "\n";
-      } else {
-        incomplete = "";
-      }
-    }
-
-    if (r == Code.INCOMPLETE) {
-      return new InterpreterResult(r, "Incomplete expression");
-    } else {
-      return new InterpreterResult(r, out.toString());
-    }
-  }
-
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    sc.cancelJobGroup(getJobGroup(context));
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    String jobGroup = getJobGroup(context);
-    int completedTasks = 0;
-    int totalTasks = 0;
-
-    DAGScheduler scheduler = sc.dagScheduler();
-    if (scheduler == null) {
-      return 0;
-    }
-    HashSet<ActiveJob> jobs = scheduler.activeJobs();
-    if (jobs == null || jobs.size() == 0) {
-      return 0;
-    }
-    Iterator<ActiveJob> it = jobs.iterator();
-    while (it.hasNext()) {
-      ActiveJob job = it.next();
-      String g = (String) job.properties().get("spark.jobGroup.id");
-
-      if (jobGroup.equals(g)) {
-        int[] progressInfo = null;
-        if (sc.version().startsWith("1.0")) {
-          progressInfo = getProgressFromStage_1_0x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.1")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.2")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.3")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else {
-          continue;
-        }
-        totalTasks += progressInfo[0];
-        completedTasks += progressInfo[1];
-      }
-    }
-
-    if (totalTasks == 0) {
-      return 0;
-    }
-    return completedTasks * 100 / totalTasks;
-  }
-
-  private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, 
Stage stage) {
-    int numTasks = stage.numTasks();
-    int completedTasks = 0;
-
-    Method method;
-    Object completedTaskInfo = null;
-    try {
-      method = sparkListener.getClass().getMethod("stageIdToTasksComplete");
-      completedTaskInfo =
-          JavaConversions.asJavaMap((HashMap<Object, Object>) 
method.invoke(sparkListener)).get(
-              stage.id());
-    } catch (NoSuchMethodException | SecurityException e) {
-      logger.error("Error while getting progress", e);
-    } catch (IllegalAccessException e) {
-      logger.error("Error while getting progress", e);
-    } catch (IllegalArgumentException e) {
-      logger.error("Error while getting progress", e);
-    } catch (InvocationTargetException e) {
-      logger.error("Error while getting progress", e);
-    }
-
-    if (completedTaskInfo != null) {
-      completedTasks += (int) completedTaskInfo;
-    }
-    List<Stage> parents = JavaConversions.asJavaList(stage.parents());
-    if (parents != null) {
-      for (Stage s : parents) {
-        int[] p = getProgressFromStage_1_0x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, 
Stage stage) {
-    int numTasks = stage.numTasks();
-    int completedTasks = 0;
-
-    try {
-      Method stageIdToData = 
sparkListener.getClass().getMethod("stageIdToData");
-      HashMap<Tuple2<Object, Object>, Object> stageIdData =
-          (HashMap<Tuple2<Object, Object>, Object>) 
stageIdToData.invoke(sparkListener);
-      Class<?> stageUIDataClass =
-          
this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
-
-      Method numCompletedTasks = 
stageUIDataClass.getMethod("numCompleteTasks");
-
-      Set<Tuple2<Object, Object>> keys =
-          JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava();
-      for (Tuple2<Object, Object> k : keys) {
-        if (stage.id() == (int) k._1()) {
-          Object uiData = stageIdData.get(k).get();
-          completedTasks += (int) numCompletedTasks.invoke(uiData);
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Error on getting progress information", e);
-    }
-
-    List<Stage> parents = JavaConversions.asJavaList(stage.parents());
-    if (parents != null) {
-      for (Stage s : parents) {
-        int[] p = getProgressFromStage_1_1x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
-  }
-
-  @Override
-  public void close() {
-    sc.stop();
-    sc = null;
-
-    intp.close();
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  public JobProgressListener getJobProgressListener() {
-    return sparkListener;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-      SparkInterpreter.class.getName() + this.hashCode());
-  }
-
-  public ZeppelinContext getZeppelinContext() {
-    return z;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
deleted file mode 100644
index 98947eb..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlInterpreter.java
+++ /dev/null
@@ -1,339 +0,0 @@
-package com.nflabs.zeppelin.spark;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.nflabs.zeppelin.interpreter.*;
-import org.apache.spark.SparkContext;
-import org.apache.spark.scheduler.ActiveJob;
-import org.apache.spark.scheduler.DAGScheduler;
-import org.apache.spark.scheduler.Stage;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SQLContext.QueryExecution;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-import scala.collection.mutable.HashMap;
-import scala.collection.mutable.HashSet;
-
-import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
-import com.nflabs.zeppelin.scheduler.Scheduler;
-import com.nflabs.zeppelin.scheduler.SchedulerFactory;
-
-/**
- * Spark SQL interpreter for Zeppelin.
- *
- * @author Leemoonsoo
- *
- */
-public class SparkSqlInterpreter extends Interpreter {
-  Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
-  AtomicInteger num = new AtomicInteger(0);
-
-  static {
-    Interpreter.register(
-        "sql",
-        "spark",
-        SparkSqlInterpreter.class.getName(),
-        new InterpreterPropertyBuilder()
-            .add("zeppelin.spark.maxResult", "10000", "Max number of SparkSQL 
result to display.")
-            .add("zeppelin.spark.concurrentSQL", "false",
-                "Execute multiple SQL concurrently if set true.")
-            .build());
-  }
-
-  private String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
-  }
-
-  private int maxResult;
-
-  public SparkSqlInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    this.maxResult = Integer.parseInt(getProperty("zeppelin.spark.maxResult"));
-  }
-
-  private SparkInterpreter getSparkInterpreter() {
-    for (Interpreter intp : getInterpreterGroup()) {
-      if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          if (p instanceof LazyOpenInterpreter) {
-            p.open();
-          }
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        return (SparkInterpreter) p;
-      }
-    }
-    return null;
-  }
-
-  public boolean concurrentSQL() {
-    return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
-  }
-
-  @Override
-  public void close() {}
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    SQLContext sqlc = null;
-
-    sqlc = getSparkInterpreter().getSQLContext();
-
-    SparkContext sc = sqlc.sparkContext();
-    if (concurrentSQL()) {
-      sc.setLocalProperty("spark.scheduler.pool", "fair");
-    } else {
-      sc.setLocalProperty("spark.scheduler.pool", null);
-    }
-
-    sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-
-    // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
-    Object rdd;
-    Object[] rows = null;
-    try {
-      rdd = sqlc.sql(st);
-
-      Method take = rdd.getClass().getMethod("take", int.class);
-      rows = (Object[]) take.invoke(rdd, maxResult + 1);
-    } catch (Exception e) {
-      logger.error("Error", e);
-      sc.clearJobGroup();
-      return new InterpreterResult(Code.ERROR, 
InterpreterUtils.getMostRelevantMessage(e));
-    }
-
-    String msg = null;
-
-    // get field names
-    Method queryExecution;
-    QueryExecution qe;
-    try {
-      queryExecution = rdd.getClass().getMethod("queryExecution");
-      qe = (QueryExecution) queryExecution.invoke(rdd);
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    List<Attribute> columns =
-        scala.collection.JavaConverters.asJavaListConverter(
-            qe.analyzed().output()).asJava();
-
-    for (Attribute col : columns) {
-      if (msg == null) {
-        msg = col.name();
-      } else {
-        msg += "\t" + col.name();
-      }
-    }
-
-    msg += "\n";
-
-    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, 
DynamicType,
-    // FloatType, FractionalType, IntegerType, IntegralType, LongType, 
MapType, NativeType,
-    // NullType, NumericType, ShortType, StringType, StructType
-
-    try {
-      for (int r = 0; r < maxResult && r < rows.length; r++) {
-        Object row = rows[r];
-        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
-        Method apply = row.getClass().getMethod("apply", int.class);
-
-        for (int i = 0; i < columns.size(); i++) {
-          if (!(Boolean) isNullAt.invoke(row, i)) {
-            msg += apply.invoke(row, i).toString();
-          } else {
-            msg += "null";
-          }
-          if (i != columns.size() - 1) {
-            msg += "\t";
-          }
-        }
-        msg += "\n";
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    if (rows.length > maxResult) {
-      msg += "\n<font color=red>Results are limited by " + maxResult + 
".</font>";
-    }
-    InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + 
msg);
-    sc.clearJobGroup();
-    return rett;
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    SQLContext sqlc = getSparkInterpreter().getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-
-    sc.cancelJobGroup(getJobGroup(context));
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.SIMPLE;
-  }
-
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    String jobGroup = getJobGroup(context);
-    SQLContext sqlc = getSparkInterpreter().getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-    JobProgressListener sparkListener = 
getSparkInterpreter().getJobProgressListener();
-    int completedTasks = 0;
-    int totalTasks = 0;
-
-    DAGScheduler scheduler = sc.dagScheduler();
-    HashSet<ActiveJob> jobs = scheduler.activeJobs();
-    Iterator<ActiveJob> it = jobs.iterator();
-    while (it.hasNext()) {
-      ActiveJob job = it.next();
-      String g = (String) job.properties().get("spark.jobGroup.id");
-      if (jobGroup.equals(g)) {
-        int[] progressInfo = null;
-        if (sc.version().startsWith("1.0")) {
-          progressInfo = getProgressFromStage_1_0x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.1")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.2")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else if (sc.version().startsWith("1.3")) {
-          progressInfo = getProgressFromStage_1_1x(sparkListener, 
job.finalStage());
-        } else {
-          logger.warn("Spark {} getting progress information not supported" + 
sc.version());
-          continue;
-        }
-        totalTasks += progressInfo[0];
-        completedTasks += progressInfo[1];
-      }
-    }
-
-    if (totalTasks == 0) {
-      return 0;
-    }
-    return completedTasks * 100 / totalTasks;
-  }
-
-  private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, 
Stage stage) {
-    int numTasks = stage.numTasks();
-    int completedTasks = 0;
-
-    Method method;
-    Object completedTaskInfo = null;
-    try {
-      method = sparkListener.getClass().getMethod("stageIdToTasksComplete");
-      completedTaskInfo =
-          JavaConversions.asJavaMap((HashMap<Object, Object>) 
method.invoke(sparkListener)).get(
-              stage.id());
-    } catch (NoSuchMethodException | SecurityException e) {
-      logger.error("Error while getting progress", e);
-    } catch (IllegalAccessException e) {
-      logger.error("Error while getting progress", e);
-    } catch (IllegalArgumentException e) {
-      logger.error("Error while getting progress", e);
-    } catch (InvocationTargetException e) {
-      logger.error("Error while getting progress", e);
-    }
-
-    if (completedTaskInfo != null) {
-      completedTasks += (int) completedTaskInfo;
-    }
-    List<Stage> parents = JavaConversions.asJavaList(stage.parents());
-    if (parents != null) {
-      for (Stage s : parents) {
-        int[] p = getProgressFromStage_1_0x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, 
Stage stage) {
-    int numTasks = stage.numTasks();
-    int completedTasks = 0;
-
-    try {
-      Method stageIdToData = 
sparkListener.getClass().getMethod("stageIdToData");
-      HashMap<Tuple2<Object, Object>, Object> stageIdData =
-          (HashMap<Tuple2<Object, Object>, Object>) 
stageIdToData.invoke(sparkListener);
-      Class<?> stageUIDataClass =
-          
this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
-
-      Method numCompletedTasks = 
stageUIDataClass.getMethod("numCompleteTasks");
-
-      Set<Tuple2<Object, Object>> keys =
-          JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava();
-      for (Tuple2<Object, Object> k : keys) {
-        if (stage.id() == (int) k._1()) {
-          Object uiData = stageIdData.get(k).get();
-          completedTasks += (int) numCompletedTasks.invoke(uiData);
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Error on getting progress information", e);
-    }
-
-    List<Stage> parents = JavaConversions.asJavaList(stage.parents());
-    if (parents != null) {
-      for (Stage s : parents) {
-        int[] p = getProgressFromStage_1_1x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-    return new int[] {numTasks, completedTasks};
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    if (concurrentSQL()) {
-      int maxConcurrency = 10;
-      return SchedulerFactory.singleton().createOrGetParallelScheduler(
-          SparkSqlInterpreter.class.getName() + this.hashCode(), 
maxConcurrency);
-    } else {
-      // getSparkInterpreter() calls open() inside.
-      // That means if SparkInterpreter is not opened, it'll wait until 
SparkInterpreter open.
-      // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' 
or 'RUNNING'.
-      // It's because of scheduler is not created yet, and scheduler is 
created by this function.
-      // Therefore, we can still use getSparkInterpreter() here, but it's 
better and safe
-      // to getSparkInterpreter without opening it.
-      for (Interpreter intp : getInterpreterGroup()) {
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          return p.getScheduler();
-        } else {
-          continue;
-        }
-      }
-      throw new InterpreterException("Can't find SparkInterpreter");
-    }
-  }
-
-  @Override
-  public List<String> completion(String buf, int cursor) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
deleted file mode 100644
index 30f6015..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/ZeppelinContext.java
+++ /dev/null
@@ -1,238 +0,0 @@
-package com.nflabs.zeppelin.spark;
-
-import static scala.collection.JavaConversions.asJavaCollection;
-import static scala.collection.JavaConversions.asJavaIterable;
-import static scala.collection.JavaConversions.collectionAsScalaIterable;
-
-import java.io.PrintStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.hive.HiveContext;
-
-import scala.Tuple2;
-import scala.collection.Iterable;
-
-import com.nflabs.zeppelin.display.GUI;
-import com.nflabs.zeppelin.display.Input.ParamOption;
-import com.nflabs.zeppelin.interpreter.InterpreterContext;
-import com.nflabs.zeppelin.spark.dep.DependencyResolver;
-
-/**
- * Spark context for zeppelin.
- *
- * @author Leemoonsoo
- *
- */
-public class ZeppelinContext extends HashMap<String, Object> {
-  private DependencyResolver dep;
-  private PrintStream out;
-  private InterpreterContext interpreterContext;
-
-  public ZeppelinContext(SparkContext sc, SQLContext sql,
-      InterpreterContext interpreterContext,
-      DependencyResolver dep, PrintStream printStream) {
-    this.sc = sc;
-    this.sqlContext = sql;
-    this.interpreterContext = interpreterContext;
-    this.dep = dep;
-    this.out = printStream;
-  }
-
-  public SparkContext sc;
-  public SQLContext sqlContext;
-  public HiveContext hiveContext;
-  private GUI gui;
-
-  /* spark-1.3
-  public SchemaRDD sql(String sql) {
-    return sqlContext.sql(sql);
-  }
-  */
-
-  /**
-   * Load dependency for interpreter and runtime (driver).
-   * And distribute them to spark cluster (sc.add())
-   *
-   * @param artifact "group:artifact:version" or file path like 
"/somepath/your.jar"
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> load(String artifact) throws Exception {
-    return collectionAsScalaIterable(dep.load(artifact, true));
-  }
-
-  /**
-   * Load dependency and it's transitive dependencies for interpreter and 
runtime (driver).
-   * And distribute them to spark cluster (sc.add())
-   *
-   * @param artifact "groupId:artifactId:version" or file path like 
"/somepath/your.jar"
-   * @param excludes exclusion list of transitive dependency. list of 
"groupId:artifactId" string.
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> load(String artifact, 
scala.collection.Iterable<String> excludes)
-      throws Exception {
-    return collectionAsScalaIterable(
-        dep.load(artifact,
-        asJavaCollection(excludes),
-        true));
-  }
-
-  /**
-   * Load dependency and it's transitive dependencies for interpreter and 
runtime (driver).
-   * And distribute them to spark cluster (sc.add())
-   *
-   * @param artifact "groupId:artifactId:version" or file path like 
"/somepath/your.jar"
-   * @param excludes exclusion list of transitive dependency. list of 
"groupId:artifactId" string.
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> load(String artifact, Collection<String> excludes) 
throws Exception {
-    return collectionAsScalaIterable(dep.load(artifact, excludes, true));
-  }
-
-  /**
-   * Load dependency for interpreter and runtime, and then add to sparkContext.
-   * But not adding them to spark cluster
-   *
-   * @param artifact "groupId:artifactId:version" or file path like 
"/somepath/your.jar"
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> loadLocal(String artifact) throws Exception {
-    return collectionAsScalaIterable(dep.load(artifact, false));
-  }
-
-
-  /**
-   * Load dependency and it's transitive dependencies and then add to 
sparkContext.
-   * But not adding them to spark cluster
-   *
-   * @param artifact "groupId:artifactId:version" or file path like 
"/somepath/your.jar"
-   * @param excludes exclusion list of transitive dependency. list of 
"groupId:artifactId" string.
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> loadLocal(String artifact,
-      scala.collection.Iterable<String> excludes) throws Exception {
-    return collectionAsScalaIterable(dep.load(artifact,
-        asJavaCollection(excludes), false));
-  }
-
-  /**
-   * Load dependency and it's transitive dependencies and then add to 
sparkContext.
-   * But not adding them to spark cluster
-   *
-   * @param artifact "groupId:artifactId:version" or file path like 
"/somepath/your.jar"
-   * @param excludes exclusion list of transitive dependency. list of 
"groupId:artifactId" string.
-   * @return
-   * @throws Exception
-   */
-  public Iterable<String> loadLocal(String artifact, Collection<String> 
excludes)
-      throws Exception {
-    return collectionAsScalaIterable(dep.load(artifact, excludes, false));
-  }
-
-
-  /**
-   * Add maven repository
-   *
-   * @param id id of repository ex) oss, local, snapshot
-   * @param url url of repository. supported protocol : file, http, https
-   */
-  public void addRepo(String id, String url) {
-    addRepo(id, url, false);
-  }
-
-  /**
-   * Add maven repository
-   *
-   * @param id id of repository
-   * @param url url of repository. supported protocol : file, http, https
-   * @param snapshot true if it is snapshot repository
-   */
-  public void addRepo(String id, String url, boolean snapshot) {
-    dep.addRepo(id, url, snapshot);
-  }
-
-  /**
-   * Remove maven repository by id
-   * @param id id of repository
-   */
-  public void removeRepo(String id){
-    dep.delRepo(id);
-  }
-
-  /**
-   * Load dependency only interpreter.
-   *
-   * @param name
-   * @return
-   */
-
-  public Object input(String name) {
-    return input(name, "");
-  }
-
-  public Object input(String name, Object defaultValue) {
-    return gui.input(name, defaultValue);
-  }
-
-  public Object select(String name, scala.collection.Iterable<Tuple2<Object, 
String>> options) {
-    return select(name, "", options);
-  }
-
-  public Object select(String name, Object defaultValue,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    int n = options.size();
-    ParamOption[] paramOptions = new ParamOption[n];
-    Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      Tuple2<Object, String> valueAndDisplayValue = it.next();
-      paramOptions[i++] = new ParamOption(valueAndDisplayValue._1(), 
valueAndDisplayValue._2());
-    }
-
-    return gui.select(name, "", paramOptions);
-  }
-
-  public void setGui(GUI o) {
-    this.gui = o;
-  }
-
-  public void run(String lines) {
-    /*
-    String intpName = Paragraph.getRequiredReplName(lines);
-    String scriptBody = Paragraph.getScriptBody(lines);
-    Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
-    InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
-    if (ret.code() == InterpreterResult.Code.SUCCESS) {
-      out.println("%" + ret.type().toString().toLowerCase() + " " + 
ret.message());
-    } else if (ret.code() == InterpreterResult.Code.ERROR) {
-      out.println("Error: " + ret.message());
-    } else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
-      out.println("Incomplete");
-    } else {
-      out.println("Unknown error");
-    }
-    */
-    throw new RuntimeException("Missing implementation");
-  }
-
-  private void restartInterpreter() {
-  }
-
-  public InterpreterContext getInterpreterContext() {
-    return interpreterContext;
-  }
-
-  public void setInterpreterContext(InterpreterContext interpreterContext) {
-    this.interpreterContext = interpreterContext;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Booter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Booter.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Booter.java
deleted file mode 100644
index 10c5bc2..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Booter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import java.io.File;
-
-import org.apache.maven.repository.internal.MavenRepositorySystemSession;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.repository.LocalRepository;
-import org.sonatype.aether.repository.RemoteRepository;
-
-/**
- * Manage mvn repository.
- *
- * @author anthonycorbacho
- *
- */
-public class Booter {
-  public static RepositorySystem newRepositorySystem() {
-    return RepositorySystemFactory.newRepositorySystem();
-  }
-
-  public static RepositorySystemSession newRepositorySystemSession(
-      RepositorySystem system, String localRepoPath) {
-    MavenRepositorySystemSession session = new MavenRepositorySystemSession();
-
-    // find homedir
-    String home = System.getenv("ZEPPELIN_HOME");
-    if (home == null) {
-      home = System.getProperty("zeppelin.home");
-    }
-    if (home == null) {
-      home = "..";
-    }
-
-    String path = home + "/" + localRepoPath;
-
-    LocalRepository localRepo =
-        new LocalRepository(new File(path).getAbsolutePath());
-    
session.setLocalRepositoryManager(system.newLocalRepositoryManager(localRepo));
-
-    // session.setTransferListener(new ConsoleTransferListener());
-    // session.setRepositoryListener(new ConsoleRepositoryListener());
-
-    // uncomment to generate dirty trees
-    // session.setDependencyGraphTransformer( null );
-
-    return session;
-  }
-
-  public static RemoteRepository newCentralRepository() {
-    return new RemoteRepository("central", "default", 
"http://repo1.maven.org/maven2/";);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java
deleted file mode 100644
index f8f6494..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Dependency.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- *
- */
-public class Dependency {
-  private String groupArtifactVersion;
-  private boolean local = false;
-  private List<String> exclusions;
-
-
-  public Dependency(String groupArtifactVersion) {
-    this.groupArtifactVersion = groupArtifactVersion;
-    exclusions = new LinkedList<String>();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof Dependency)) {
-      return false;
-    } else {
-      return ((Dependency) 
o).groupArtifactVersion.equals(groupArtifactVersion);
-    }
-  }
-
-  /**
-   * Don't add artifact into SparkContext (sc.addJar())
-   * @return
-   */
-  public Dependency local() {
-    local = true;
-    return this;
-  }
-
-  public Dependency excludeAll() {
-    exclude("*");
-    return this;
-  }
-
-  /**
-   *
-   * @param exclusions comma or newline separated list of "groupId:ArtifactId"
-   * @return
-   */
-  public Dependency exclude(String exclusions) {
-    for (String item : exclusions.split(",|\n")) {
-      this.exclusions.add(item);
-    }
-
-    return this;
-  }
-
-
-  public String getGroupArtifactVersion() {
-    return groupArtifactVersion;
-  }
-
-  public boolean isDist() {
-    return !local;
-  }
-
-  public List<String> getExclusions() {
-    return exclusions;
-  }
-
-  public boolean isLocalFsArtifact() {
-    int numSplits = groupArtifactVersion.split(":").length;
-    return !(numSplits >= 3 && numSplits <= 6);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java
deleted file mode 100644
index 58268eb..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyContext.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-
-/**
- *
- */
-public class DependencyContext {
-  List<Dependency> dependencies = new LinkedList<Dependency>();
-  List<Repository> repositories = new LinkedList<Repository>();
-
-  List<File> files = new LinkedList<File>();
-  List<File> filesDist = new LinkedList<File>();
-  private RepositorySystem system = Booter.newRepositorySystem();
-  private RepositorySystemSession session;
-  private RemoteRepository mavenCentral = new RemoteRepository("central",
-      "default", "http://repo1.maven.org/maven2/";);
-  private RemoteRepository mavenLocal = new RemoteRepository("local",
-      "default", "file://" + System.getProperty("user.home") + 
"/.m2/repository");
-
-  public DependencyContext(String localRepoPath) {
-    session =  Booter.newRepositorySystemSession(system, localRepoPath);
-  }
-
-  public Dependency load(String lib) {
-    Dependency dep = new Dependency(lib);
-
-    if (dependencies.contains(dep)) {
-      dependencies.remove(dep);
-    }
-    dependencies.add(dep);
-    return dep;
-  }
-
-  public Repository addRepo(String name) {
-    Repository rep = new Repository(name);
-    repositories.add(rep);
-    return rep;
-  }
-
-  public void reset() {
-    dependencies = new LinkedList<Dependency>();
-    repositories = new LinkedList<Repository>();
-
-    files = new LinkedList<File>();
-    filesDist = new LinkedList<File>();
-  }
-
-
-  /**
-   * fetch all artifacts
-   * @return
-   * @throws MalformedURLException
-   * @throws ArtifactResolutionException
-   * @throws DependencyResolutionException
-   */
-  public List<File> fetch() throws MalformedURLException,
-      DependencyResolutionException, ArtifactResolutionException {
-
-    for (Dependency dep : dependencies) {
-      if (!dep.isLocalFsArtifact()) {
-        List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
-        for (ArtifactResult artifact : artifacts) {
-          if (dep.isDist()) {
-            filesDist.add(artifact.getArtifact().getFile());
-          }
-          files.add(artifact.getArtifact().getFile());
-        }
-      } else {
-        if (dep.isDist()) {
-          filesDist.add(new File(dep.getGroupArtifactVersion()));
-        }
-        files.add(new File(dep.getGroupArtifactVersion()));
-      }
-    }
-
-    return files;
-  }
-
-  private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
-      throws DependencyResolutionException, ArtifactResolutionException {
-    Artifact artifact = new DefaultArtifact(
-        DependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
-
-    DependencyFilter classpathFlter = DependencyFilterUtils
-        .classpathFilter(JavaScopes.COMPILE);
-    PatternExclusionsDependencyFilter exclusionFilter = new 
PatternExclusionsDependencyFilter(
-        DependencyResolver.inferScalaVersion(dep.getExclusions()));
-
-    CollectRequest collectRequest = new CollectRequest();
-    collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
-        JavaScopes.COMPILE));
-
-    collectRequest.addRepository(mavenCentral);
-    collectRequest.addRepository(mavenLocal);
-    for (Repository repo : repositories) {
-      RemoteRepository rr = new RemoteRepository(repo.getName(), "default", 
repo.getUrl());
-      rr.setPolicy(repo.isSnapshot(), null);
-      collectRequest.addRepository(rr);
-    }
-
-    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
-        DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
-
-    return system.resolveDependencies(session, 
dependencyRequest).getArtifactResults();
-  }
-
-  public List<File> getFiles() {
-    return files;
-  }
-
-  public List<File> getFilesDist() {
-    return filesDist;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java
deleted file mode 100644
index 4800e1a..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/DependencyResolver.java
+++ /dev/null
@@ -1,333 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkContext;
-import org.apache.spark.repl.SparkIMain;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.RepositorySystemSession;
-import org.sonatype.aether.artifact.Artifact;
-import org.sonatype.aether.collection.CollectRequest;
-import org.sonatype.aether.graph.Dependency;
-import org.sonatype.aether.graph.DependencyFilter;
-import org.sonatype.aether.repository.RemoteRepository;
-import org.sonatype.aether.resolution.ArtifactResult;
-import org.sonatype.aether.resolution.DependencyRequest;
-import org.sonatype.aether.util.artifact.DefaultArtifact;
-import org.sonatype.aether.util.artifact.JavaScopes;
-import org.sonatype.aether.util.filter.DependencyFilterUtils;
-import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
-
-import scala.Some;
-import scala.collection.IndexedSeq;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.backend.JavaPlatform;
-import scala.tools.nsc.util.ClassPath;
-import scala.tools.nsc.util.MergedClassPath;
-
-/**
- * Deps resolver.
- * Add new dependencies from mvn repo (at runetime) to Zeppelin.
- *
- * @author anthonycorbacho
- *
- */
-public class DependencyResolver {
-  Logger logger = LoggerFactory.getLogger(DependencyResolver.class);
-  private Global global;
-  private SparkIMain intp;
-  private SparkContext sc;
-  private RepositorySystem system = Booter.newRepositorySystem();
-  private List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
-  private RepositorySystemSession session;
-  private DependencyFilter classpathFlter = 
DependencyFilterUtils.classpathFilter(
-                                                                               
 JavaScopes.COMPILE,
-                                                                               
 JavaScopes.PROVIDED,
-                                                                               
 JavaScopes.RUNTIME,
-                                                                               
 JavaScopes.SYSTEM);
-
-  private final String[] exclusions = new String[] 
{"org.scala-lang:scala-library",
-                                                    
"org.scala-lang:scala-compiler",
-                                                    
"org.scala-lang:scala-reflect",
-                                                    "org.scala-lang:scalap",
-                                                    
"com.nflabs.zeppelin:zeppelin-zengine",
-                                                    
"com.nflabs.zeppelin:zeppelin-spark",
-                                                    
"com.nflabs.zeppelin:zeppelin-server"};
-
-  public DependencyResolver(SparkIMain intp, SparkContext sc, String 
localRepoPath) {
-    this.intp = intp;
-    this.global = intp.global();
-    this.sc = sc;
-    session = Booter.newRepositorySystemSession(system, localRepoPath);
-    repos.add(Booter.newCentralRepository()); // add maven central
-    repos.add(new RemoteRepository("local", "default", "file://"
-        + System.getProperty("user.home") + "/.m2/repository"));
-  }
-
-  public void addRepo(String id, String url, boolean snapshot) {
-    synchronized (repos) {
-      delRepo(id);
-      RemoteRepository rr = new RemoteRepository(id, "default", url);
-      rr.setPolicy(snapshot, null);
-      repos.add(rr);
-    }
-  }
-
-  public RemoteRepository delRepo(String id) {
-    synchronized (repos) {
-      Iterator<RemoteRepository> it = repos.iterator();
-      if (it.hasNext()) {
-        RemoteRepository repo = it.next();
-        if (repo.getId().equals(id)) {
-          it.remove();
-          return repo;
-        }
-      }
-    }
-    return null;
-  }
-
-  private void updateCompilerClassPath(URL[] urls) throws 
IllegalAccessException,
-      IllegalArgumentException, InvocationTargetException {
-
-    JavaPlatform platform = (JavaPlatform) global.platform();
-    MergedClassPath<AbstractFile> newClassPath = 
mergeUrlsIntoClassPath(platform, urls);
-
-    Method[] methods = platform.getClass().getMethods();
-    for (Method m : methods) {
-      if (m.getName().endsWith("currentClassPath_$eq")) {
-        m.invoke(platform, new Some(newClassPath));
-        break;
-      }
-    }
-
-    // NOTE: Must use reflection until this is exposed/fixed upstream in Scala
-    List<String> classPaths = new LinkedList<String>();
-    for (URL url : urls) {
-      classPaths.add(url.getPath());
-    }
-
-    // Reload all jars specified into our compiler
-    
global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths)
-        .toList());
-  }
-
-  // Until spark 1.1.x
-  // check 
https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
-  private void updateRuntimeClassPath(URL[] urls) throws SecurityException, 
IllegalAccessException,
-      IllegalArgumentException, InvocationTargetException, 
NoSuchMethodException {
-    ClassLoader cl = intp.classLoader().getParent();
-    Method addURL;
-    addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] 
{URL.class});
-    addURL.setAccessible(true);
-    for (URL url : urls) {
-      addURL.invoke(cl, url);
-    }
-  }
-
-  private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform 
platform, URL[] urls) {
-    IndexedSeq<ClassPath<AbstractFile>> entries =
-        ((MergedClassPath<AbstractFile>) platform.classPath()).entries();
-    List<ClassPath<AbstractFile>> cp = new 
LinkedList<ClassPath<AbstractFile>>();
-
-    for (int i = 0; i < entries.size(); i++) {
-      cp.add(entries.apply(i));
-    }
-
-    for (URL url : urls) {
-      AbstractFile file;
-      if ("file".equals(url.getProtocol())) {
-        File f = new File(url.getPath());
-        if (f.isDirectory()) {
-          file = 
AbstractFile.getDirectory(scala.reflect.io.File.jfile2path(f));
-        } else {
-          file = AbstractFile.getFile(scala.reflect.io.File.jfile2path(f));
-        }
-      } else {
-        file = AbstractFile.getURL(url);
-      }
-
-      ClassPath<AbstractFile> newcp = 
platform.classPath().context().newClassPath(file);
-
-      // distinct
-      if (cp.contains(newcp) == false) {
-        cp.add(newcp);
-      }
-    }
-
-    return new 
MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(),
-        platform.classPath().context());
-  }
-
-  public List<String> load(String artifact,
-      boolean addSparkContext) throws Exception {
-    return load(artifact, new LinkedList<String>(), addSparkContext);
-  }
-
-  public List<String> load(String artifact, Collection<String> excludes,
-      boolean addSparkContext) throws Exception {
-    if (StringUtils.isBlank(artifact)) {
-      // Should throw here
-      throw new RuntimeException("Invalid artifact to load");
-    }
-
-    // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
-    int numSplits = artifact.split(":").length;
-    if (numSplits >= 3 && numSplits <= 6) {
-      return loadFromMvn(artifact, excludes, addSparkContext);
-    } else {
-      loadFromFs(artifact, addSparkContext);
-      LinkedList<String> libs = new LinkedList<String>();
-      libs.add(artifact);
-      return libs;
-    }
-  }
-
-  private void loadFromFs(String artifact, boolean addSparkContext) throws 
Exception {
-    File jarFile = new File(artifact);
-
-    intp.global().new Run();
-
-    updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()});
-    updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()});
-
-    if (addSparkContext) {
-      sc.addJar(jarFile.getAbsolutePath());
-    }
-  }
-
-  private List<String> loadFromMvn(String artifact, Collection<String> 
excludes,
-      boolean addSparkContext) throws Exception {
-    List<String> loadedLibs = new LinkedList<String>();
-    Collection<String> allExclusions = new LinkedList<String>();
-    allExclusions.addAll(excludes);
-    allExclusions.addAll(Arrays.asList(exclusions));
-
-    List<ArtifactResult> listOfArtifact;
-    listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
-
-    Iterator<ArtifactResult> it = listOfArtifact.iterator();
-    while (it.hasNext()) {
-      Artifact a = it.next().getArtifact();
-      String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + 
a.getVersion();
-      for (String exclude : allExclusions) {
-        if (gav.startsWith(exclude)) {
-          it.remove();
-          break;
-        }
-      }
-    }
-
-    List<URL> newClassPathList = new LinkedList<URL>();
-    List<File> files = new LinkedList<File>();
-    for (ArtifactResult artifactResult : listOfArtifact) {
-      logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":"
-          + artifactResult.getArtifact().getArtifactId() + ":"
-          + artifactResult.getArtifact().getVersion());
-      
newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL());
-      files.add(artifactResult.getArtifact().getFile());
-      loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":"
-          + artifactResult.getArtifact().getArtifactId() + ":"
-          + artifactResult.getArtifact().getVersion());
-    }
-
-    intp.global().new Run();
-    updateRuntimeClassPath(newClassPathList.toArray(new URL[0]));
-    updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
-
-    if (addSparkContext) {
-      for (File f : files) {
-        sc.addJar(f.getAbsolutePath());
-      }
-    }
-
-    return loadedLibs;
-  }
-
-  /**
-   *
-   * @param dependency
-   * @param excludes list of pattern can either be of the form 
groupId:artifactId
-   * @return
-   * @throws Exception
-   */
-  public List<ArtifactResult> getArtifactsWithDep(String dependency,
-      Collection<String> excludes) throws Exception {
-    Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency));
-    DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( 
JavaScopes.COMPILE );
-    PatternExclusionsDependencyFilter exclusionFilter =
-        new PatternExclusionsDependencyFilter(inferScalaVersion(excludes));
-
-    CollectRequest collectRequest = new CollectRequest();
-    collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
-
-    synchronized (repos) {
-      for (RemoteRepository repo : repos) {
-        collectRequest.addRepository(repo);
-      }
-    }
-    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
-        DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
-    return system.resolveDependencies(session, 
dependencyRequest).getArtifactResults();
-  }
-
-  public static Collection<String> inferScalaVersion(Collection<String> 
artifact) {
-    List<String> list = new LinkedList<String>();
-    for (String a : artifact) {
-      list.add(inferScalaVersion(a));
-    }
-    return list;
-  }
-
-  public static String inferScalaVersion(String artifact) {
-    int pos = artifact.indexOf(":");
-    if (pos < 0 || pos + 2 >= artifact.length()) {
-      // failed to infer
-      return artifact;
-    }
-
-    if (':' == artifact.charAt(pos + 1)) {
-      String restOfthem = "";
-      String versionSep = ":";
-
-      String groupId = artifact.substring(0, pos);
-      int nextPos = artifact.indexOf(":", pos + 2);
-      if (nextPos < 0) {
-        if (artifact.charAt(artifact.length() - 1) == '*') {
-          nextPos = artifact.length() - 1;
-          versionSep = "";
-          restOfthem = "*";
-        } else {
-          versionSep = "";
-          nextPos = artifact.length();
-        }
-      }
-
-      String artifactId = artifact.substring(pos + 2, nextPos);
-      if (nextPos < artifact.length()) {
-        if (!restOfthem.equals("*")) {
-          restOfthem = artifact.substring(nextPos + 1);
-        }
-      }
-
-      String [] version = 
scala.util.Properties.versionNumberString().split("[.]");
-      String scalaVersion = version[0] + "." + version[1];
-
-      return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + 
restOfthem;
-    } else {
-      return artifact;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java
deleted file mode 100644
index 8ca5fe7..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/Repository.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-/**
- *
- *
- */
-public class Repository {
-  private boolean snapshot = false;
-  private String name;
-  private String url;
-
-  public Repository(String name){
-    this.name = name;
-  }
-
-  public Repository url(String url) {
-    this.url = url;
-    return this;
-  }
-
-  public Repository snapshot() {
-    snapshot = true;
-    return this;
-  }
-
-  public boolean isSnapshot() {
-    return snapshot;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositoryListener.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositoryListener.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositoryListener.java
deleted file mode 100644
index 0fed51d..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositoryListener.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.AbstractRepositoryListener;
-import org.sonatype.aether.RepositoryEvent;
-
-/**
- * Simple listener that print log.
- * 
- * @author anthonycorbacho
- *
- */
-public class RepositoryListener extends AbstractRepositoryListener {
-  Logger logger = LoggerFactory.getLogger(RepositoryListener.class);
-
-  public RepositoryListener() {}
-
-  public void artifactDeployed(RepositoryEvent event) {
-    logger.info("Deployed " + event.getArtifact() + " to " + 
event.getRepository());
-  }
-
-  public void artifactDeploying(RepositoryEvent event) {
-    logger.info("Deploying " + event.getArtifact() + " to " + 
event.getRepository());
-  }
-
-  public void artifactDescriptorInvalid(RepositoryEvent event) {
-    logger.info("Invalid artifact descriptor for " + event.getArtifact() + ": "
-                                                   + 
event.getException().getMessage());
-  }
-
-  public void artifactDescriptorMissing(RepositoryEvent event) {
-    logger.info("Missing artifact descriptor for " + event.getArtifact());
-  }
-
-  public void artifactInstalled(RepositoryEvent event) {
-    logger.info("Installed " + event.getArtifact() + " to " + event.getFile());
-  }
-
-  public void artifactInstalling(RepositoryEvent event) {
-    logger.info("Installing " + event.getArtifact() + " to " + 
event.getFile());
-  }
-
-  public void artifactResolved(RepositoryEvent event) {
-    logger.info("Resolved artifact " + event.getArtifact() + " from " + 
event.getRepository());
-  }
-
-  public void artifactDownloading(RepositoryEvent event) {
-    logger.info("Downloading artifact " + event.getArtifact() + " from " + 
event.getRepository());
-  }
-
-  public void artifactDownloaded(RepositoryEvent event) {
-    logger.info("Downloaded artifact " + event.getArtifact() + " from " + 
event.getRepository());
-  }
-
-  public void artifactResolving(RepositoryEvent event) {
-    logger.info("Resolving artifact " + event.getArtifact());
-  }
-
-  public void metadataDeployed(RepositoryEvent event) {
-    logger.info("Deployed " + event.getMetadata() + " to " + 
event.getRepository());
-  }
-
-  public void metadataDeploying(RepositoryEvent event) {
-    logger.info("Deploying " + event.getMetadata() + " to " + 
event.getRepository());
-  }
-
-  public void metadataInstalled(RepositoryEvent event) {
-    logger.info("Installed " + event.getMetadata() + " to " + event.getFile());
-  }
-
-  public void metadataInstalling(RepositoryEvent event) {
-    logger.info("Installing " + event.getMetadata() + " to " + 
event.getFile());
-  }
-
-  public void metadataInvalid(RepositoryEvent event) {
-    logger.info("Invalid metadata " + event.getMetadata());
-  }
-
-  public void metadataResolved(RepositoryEvent event) {
-    logger.info("Resolved metadata " + event.getMetadata() + " from " + 
event.getRepository());
-  }
-
-  public void metadataResolving(RepositoryEvent event) {
-    logger.info("Resolving metadata " + event.getMetadata() + " from " + 
event.getRepository());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java
 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java
deleted file mode 100644
index cf48a33..0000000
--- 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/RepositorySystemFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import org.apache.maven.repository.internal.DefaultServiceLocator;
-import org.apache.maven.wagon.Wagon;
-import org.apache.maven.wagon.providers.http.HttpWagon;
-import org.apache.maven.wagon.providers.http.LightweightHttpWagon;
-import org.sonatype.aether.RepositorySystem;
-import org.sonatype.aether.connector.file.FileRepositoryConnectorFactory;
-import org.sonatype.aether.connector.wagon.WagonProvider;
-import org.sonatype.aether.connector.wagon.WagonRepositoryConnectorFactory;
-import org.sonatype.aether.spi.connector.RepositoryConnectorFactory;
-
-/**
- * Get maven repository instance.
- *
- * @author anthonycorbacho
- *
- */
-public class RepositorySystemFactory {
-  public static RepositorySystem newRepositorySystem() {
-    DefaultServiceLocator locator = new DefaultServiceLocator();
-    locator.addService(RepositoryConnectorFactory.class, 
FileRepositoryConnectorFactory.class);
-    locator.addService(RepositoryConnectorFactory.class, 
WagonRepositoryConnectorFactory.class);
-    locator.setServices(WagonProvider.class, new ManualWagonProvider());
-
-    return locator.getService(RepositorySystem.class);
-  }
-
-  /**
-   * ManualWagonProvider
-   */
-  public static class ManualWagonProvider implements WagonProvider {
-
-    @Override
-    public Wagon lookup(String roleHint) throws Exception {
-      if ("http".equals(roleHint)) {
-        return new LightweightHttpWagon();
-      }
-
-      if ("https".equals(roleHint)) {
-        return new HttpWagon();
-      }
-
-      return null;
-    }
-
-    @Override
-    public void release(Wagon arg0) {
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/com/nflabs/zeppelin/spark/dep/TransferListener.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/TransferListener.java 
b/spark/src/main/java/com/nflabs/zeppelin/spark/dep/TransferListener.java
deleted file mode 100644
index faecf54..0000000
--- a/spark/src/main/java/com/nflabs/zeppelin/spark/dep/TransferListener.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package com.nflabs.zeppelin.spark.dep;
-
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.transfer.AbstractTransferListener;
-import org.sonatype.aether.transfer.TransferEvent;
-import org.sonatype.aether.transfer.TransferResource;
-
-/**
- * Simple listener that show deps downloading progress.
- * 
- * @author anthonycorbacho
- *
- */
-public class TransferListener extends AbstractTransferListener {
-  Logger logger = LoggerFactory.getLogger(TransferListener.class);
-  private PrintStream out;
-
-  private Map<TransferResource, Long> downloads = new 
ConcurrentHashMap<TransferResource, Long>();
-
-  private int lastLength;
-
-  public TransferListener() {}
-
-  @Override
-  public void transferInitiated(TransferEvent event) {
-    String message =
-        event.getRequestType() == TransferEvent.RequestType.PUT ? "Uploading" 
: "Downloading";
-
-    logger.info(message + ": " + event.getResource().getRepositoryUrl()
-                + event.getResource().getResourceName());
-  }
-
-  @Override
-  public void transferProgressed(TransferEvent event) {
-    TransferResource resource = event.getResource();
-    downloads.put(resource, Long.valueOf(event.getTransferredBytes()));
-
-    StringBuilder buffer = new StringBuilder(64);
-
-    for (Map.Entry<TransferResource, Long> entry : downloads.entrySet()) {
-      long total = entry.getKey().getContentLength();
-      long complete = entry.getValue().longValue();
-
-      buffer.append(getStatus(complete, total)).append("  ");
-    }
-
-    int pad = lastLength - buffer.length();
-    lastLength = buffer.length();
-    pad(buffer, pad);
-    buffer.append('\r');
-
-    logger.info(buffer.toString());
-  }
-
-  private String getStatus(long complete, long total) {
-    if (total >= 1024) {
-      return toKB(complete) + "/" + toKB(total) + " KB ";
-    } else if (total >= 0) {
-      return complete + "/" + total + " B ";
-    } else if (complete >= 1024) {
-      return toKB(complete) + " KB ";
-    } else {
-      return complete + " B ";
-    }
-  }
-
-  private void pad(StringBuilder buffer, int spaces) {
-    String block = "                                        ";
-    while (spaces > 0) {
-      int n = Math.min(spaces, block.length());
-      buffer.append(block, 0, n);
-      spaces -= n;
-    }
-  }
-
-  @Override
-  public void transferSucceeded(TransferEvent event) {
-    transferCompleted(event);
-
-    TransferResource resource = event.getResource();
-    long contentLength = event.getTransferredBytes();
-    if (contentLength >= 0) {
-      String type =
-          (event.getRequestType() == TransferEvent.RequestType.PUT ? 
"Uploaded" : "Downloaded");
-      String len = contentLength >= 1024 ? toKB(contentLength) + " KB" : 
contentLength + " B";
-
-      String throughput = "";
-      long duration = System.currentTimeMillis() - 
resource.getTransferStartTime();
-      if (duration > 0) {
-        DecimalFormat format = new DecimalFormat("0.0", new 
DecimalFormatSymbols(Locale.ENGLISH));
-        double kbPerSec = (contentLength / 1024.0) / (duration / 1000.0);
-        throughput = " at " + format.format(kbPerSec) + " KB/sec";
-      }
-
-      logger.info(type + ": " + resource.getRepositoryUrl() + 
resource.getResourceName() + " ("
-          + len + throughput + ")");
-    }
-  }
-
-  @Override
-  public void transferFailed(TransferEvent event) {
-    transferCompleted(event);
-    event.getException().printStackTrace(out);
-  }
-
-  private void transferCompleted(TransferEvent event) {
-    downloads.remove(event.getResource());
-    StringBuilder buffer = new StringBuilder(64);
-    pad(buffer, lastLength);
-    buffer.append('\r');
-    logger.info(buffer.toString());
-  }
-
-  public void transferCorrupted(TransferEvent event) {
-    event.getException().printStackTrace(out);
-  }
-
-  protected long toKB(long bytes) {
-    return (bytes + 1023) / 1024;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
new file mode 100644
index 0000000..16bd0f0
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.spark.repl.SparkILoop;
+import org.apache.spark.repl.SparkIMain;
+import org.apache.spark.repl.SparkJLineCompletion;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.spark.dep.DependencyContext;
+import org.sonatype.aether.resolution.ArtifactResolutionException;
+import org.sonatype.aether.resolution.DependencyResolutionException;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.Completion.Candidates;
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+
+/**
+ * DepInterpreter downloads dependencies and pass them when SparkInterpreter 
initialized.
+ * It extends SparkInterpreter but does not create sparkcontext
+ *
+ */
+public class DepInterpreter extends Interpreter {
+
+  static {
+    Interpreter.register(
+        "dep",
+        "spark",
+        DepInterpreter.class.getName(),
+        new InterpreterPropertyBuilder()
+            .add("zeppelin.dep.localrepo", "local-repo", "local repository for 
dependency loader")
+            .build());
+
+  }
+
+  private SparkIMain intp;
+  private ByteArrayOutputStream out;
+  private DependencyContext depc;
+  private SparkJLineCompletion completor;
+  private SparkILoop interpreter;
+
+  public DepInterpreter(Properties property) {
+    super(property);
+  }
+
+  public DependencyContext getDependencyContext() {
+    return depc;
+  }
+
+
+  @Override
+  public void close() {
+    if (intp != null) {
+      intp.close();
+    }
+  }
+
+  @Override
+  public void open() {
+    out = new ByteArrayOutputStream();
+    createIMain();
+  }
+
+
+  private void createIMain() {
+    Settings settings = new Settings();
+    URL[] urls = getClassloaderUrls();
+
+    // set classpath for scala compiler
+    PathSetting pathSettings = settings.classpath();
+    String classpath = "";
+    List<File> paths = currentClassPath();
+    for (File f : paths) {
+      if (classpath.length() > 0) {
+        classpath += File.pathSeparator;
+      }
+      classpath += f.getAbsolutePath();
+    }
+
+    if (urls != null) {
+      for (URL u : urls) {
+        if (classpath.length() > 0) {
+          classpath += File.pathSeparator;
+        }
+        classpath += u.getFile();
+      }
+    }
+
+    pathSettings.v_$eq(classpath);
+    
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+    // set classloader for scala compiler
+    settings.explicitParentLoader_$eq(new 
Some<ClassLoader>(Thread.currentThread()
+        .getContextClassLoader()));
+
+    BooleanSetting b = (BooleanSetting) settings.usejavacp();
+    b.v_$eq(true);
+    
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+    interpreter = new SparkILoop(null, new PrintWriter(out));
+    interpreter.settings_$eq(settings);
+
+    interpreter.createInterpreter();
+
+
+    intp = interpreter.intp();
+    intp.setContextClassLoader();
+    intp.initializeSynchronous();
+
+    depc = new DependencyContext(getProperty("zeppelin.dep.localrepo"));
+    completor = new SparkJLineCompletion(intp);
+
+    intp.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
+    Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
+    binder.put("depc", depc);
+
+    intp.interpret("@transient val z = "
+        + 
"_binder.get(\"depc\").asInstanceOf[org.apache.zeppelin.spark.dep.DependencyContext]");
+
+  }
+
+  public Object getValue(String name) {
+    Object ret = intp.valueOfTerm(name);
+    if (ret instanceof None) {
+      return null;
+    } else if (ret instanceof Some) {
+      return ((Some) ret).get();
+    } else {
+      return ret;
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    PrintStream printStream = new PrintStream(out);
+    Console.setOut(printStream);
+    out.reset();
+
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+
+    if (sparkInterpreter != null && 
sparkInterpreter.isSparkContextInitialized()) {
+      return new InterpreterResult(Code.ERROR,
+          "Must be used before SparkInterpreter (%spark) initialized");
+    }
+
+    scala.tools.nsc.interpreter.Results.Result ret = intp.interpret(st);
+    Code code = getResultCode(ret);
+
+    try {
+      depc.fetch();
+    } catch (MalformedURLException | DependencyResolutionException
+        | ArtifactResolutionException e) {
+      return new InterpreterResult(Code.ERROR, e.toString());
+    }
+
+    if (code == Code.INCOMPLETE) {
+      return new InterpreterResult(code, "Incomplete expression");
+    } else if (code == Code.ERROR) {
+      return new InterpreterResult(code, out.toString());
+    } else {
+      return new InterpreterResult(code, out.toString());
+    }
+  }
+
+  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+      return Code.SUCCESS;
+    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+      return Code.INCOMPLETE;
+    } else {
+      return Code.ERROR;
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    ScalaCompleter c = completor.completer();
+    Candidates ret = c.complete(buf, cursor);
+    return scala.collection.JavaConversions.asJavaList(ret.candidates());
+  }
+
+  private List<File> currentClassPath() {
+    List<File> paths = 
classPath(Thread.currentThread().getContextClassLoader());
+    String[] cps = 
System.getProperty("java.class.path").split(File.pathSeparator);
+    if (cps != null) {
+      for (String cp : cps) {
+        paths.add(new File(cp));
+      }
+    }
+    return paths;
+  }
+
+  private List<File> classPath(ClassLoader cl) {
+    List<File> paths = new LinkedList<File>();
+    if (cl == null) {
+      return paths;
+    }
+
+    if (cl instanceof URLClassLoader) {
+      URLClassLoader ucl = (URLClassLoader) cl;
+      URL[] urls = ucl.getURLs();
+      if (urls != null) {
+        for (URL url : urls) {
+          paths.add(new File(url.getFile()));
+        }
+      }
+    }
+    return paths;
+  }
+
+  private SparkInterpreter getSparkInterpreter() {
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    if (intpGroup == null) {
+      return null;
+    }
+    synchronized (intpGroup) {
+      for (Interpreter intp : intpGroup){
+        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          return (SparkInterpreter) p;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return getSparkInterpreter().getScheduler();
+  }
+
+}

Reply via email to