Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 77c59e4f0 -> ff85f793b


Run sequence is not preserved

This PR came from https://github.com/NFLabs/zeppelin/pull/397

After https://github.com/NFLabs/zeppelin/pull/364, order of submitted job to 
the remote interpreter is not preserved.
Result is, when user run a lot of paragraphs in a short time, they run in 
random order(like click run 'Run all paragraphs). 
https://github.com/NFLabs/zeppelin/issues/395 is one of the possible problem.

this PR adds some unittests and fix the problem.

Ready to merge.

Author: Lee moon soo <[email protected]>
Author: Lee moon soo <[email protected]>

Closes #8 from Leemoonsoo/fix/run_order and squashes the following commits:

4a6a230 [Lee moon soo] Prevent status remaining RUNNING after job executed
1fb3715 [Lee moon soo] Fix daedlock
6405abe [Lee moon soo] Update RemoteSchedulerTest
c31a807 [Lee moon soo] Make RemoteScheduler keep order of job submitted


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ff85f793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ff85f793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ff85f793

Branch: refs/heads/master
Commit: ff85f793bb35c43352432b47ee5e44e2ef650a87
Parents: 77c59e4
Author: Lee moon soo <[email protected]>
Authored: Sat Mar 28 18:05:56 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Tue Mar 31 08:39:27 2015 +0900

----------------------------------------------------------------------
 .../interpreter/LazyOpenInterpreter.java        |   1 -
 .../interpreter/remote/RemoteInterpreter.java   |  35 +--
 .../zeppelin/scheduler/JobProgressPoller.java   |   7 +-
 .../zeppelin/scheduler/RemoteScheduler.java     | 179 ++++++++----
 .../remote/RemoteInterpreterTest.java           | 288 ++++++++++++++++++-
 .../remote/mock/MockInterpreterA.java           |  16 +-
 .../remote/mock/MockInterpreterB.java           |  27 +-
 .../zeppelin/scheduler/RemoteSchedulerTest.java |  25 +-
 8 files changed, 482 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
index 0703320..753adc9 100644
--- 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -84,7 +84,6 @@ public class LazyOpenInterpreter
 
   @Override
   public FormType getFormType() {
-    open();
     return intp.getFormType();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
index 240e861..ccae0f7 100644
--- 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -34,6 +34,7 @@ public class RemoteInterpreter extends Interpreter {
   private String interpreterPath;
   private String className;
   FormType formType;
+  boolean initialized;
   private Map<String, String> env;
   static Map<String, RemoteInterpreterProcess> interpreterGroupReference
     = new HashMap<String, RemoteInterpreterProcess>();
@@ -45,6 +46,7 @@ public class RemoteInterpreter extends Interpreter {
     super(property);
 
     this.className = className;
+    initialized = false;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     env = new HashMap<String, String>();
@@ -84,7 +86,11 @@ public class RemoteInterpreter extends Interpreter {
     }
   }
 
-  private void init() {
+  private synchronized void init() {
+    if (initialized == true) {
+      return;
+    }
+
     RemoteInterpreterProcess interpreterProcess = null;
 
     synchronized (interpreterGroupReference) {
@@ -122,6 +128,7 @@ public class RemoteInterpreter extends Interpreter {
         }
       }
     }
+    initialized = true;
   }
 
 
@@ -129,24 +136,6 @@ public class RemoteInterpreter extends Interpreter {
   @Override
   public void open() {
     init();
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    try {
-      logger.info("open remote interpreter {}", className);
-      client.open(className);
-    } catch (TException e) {
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client);
-    }
   }
 
   @Override
@@ -228,6 +217,8 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public FormType getFormType() {
+    init();
+
     if (formType != null) {
       return formType;
     }
@@ -292,9 +283,11 @@ public class RemoteInterpreter extends Interpreter {
   @Override
   public Scheduler getScheduler() {
     int maxConcurrency = 10;
-
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-        "remoteinterpreter_" + this.hashCode(), getInterpreterProcess(), 
maxConcurrency);
+        "remoteinterpreter_" + interpreterProcess.hashCode(),
+        getInterpreterProcess(),
+        maxConcurrency);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
index bcda22d..142842a 100644
--- 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/JobProgressPoller.java
@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * TODO(moon) : add description.
- * 
+ *
  * @author Leemoonsoo
  *
  */
@@ -21,6 +21,7 @@ public class JobProgressPoller extends Thread {
     this.intervalMs = intervalMs;
   }
 
+  @Override
   public void run() {
     if (intervalMs < 0) {
       return;
@@ -32,7 +33,9 @@ public class JobProgressPoller extends Thread {
       JobListener listener = job.getListener();
       if (listener != null) {
         try {
-          listener.onProgressUpdate(job, job.progress());
+          if (job.isRunning()) {
+            listener.onProgressUpdate(job, job.progress());
+          }
         } catch (Exception e) {
           logger.error("Can not get or update progress", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
index bb02f13..14baa9b 100644
--- 
a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/scheduler/RemoteScheduler.java
@@ -28,10 +28,8 @@ public class RemoteScheduler implements Scheduler {
   private int maxConcurrency;
   private RemoteInterpreterProcess interpreterProcess;
 
-  public RemoteScheduler(String name,
-      ExecutorService executor,
-      RemoteInterpreterProcess interpreterProcess,
-      SchedulerListener listener,
+  public RemoteScheduler(String name, ExecutorService executor,
+      RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
       int maxConcurrency) {
     this.name = name;
     this.executor = executor;
@@ -42,8 +40,10 @@ public class RemoteScheduler implements Scheduler {
 
   @Override
   public void run() {
-    synchronized (queue) {
-      while (terminate == false) {
+    while (terminate == false) {
+      Job job = null;
+
+      synchronized (queue) {
         if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
           try {
             queue.wait(500);
@@ -52,13 +52,23 @@ public class RemoteScheduler implements Scheduler {
           continue;
         }
 
-
-        Job job = queue.remove(0);
+        job = queue.remove(0);
         running.add(job);
+      }
 
-        // run and
-        Scheduler scheduler = this;
-        executor.execute(new JobRunner(scheduler, job));
+      // run
+      Scheduler scheduler = this;
+      JobRunner jobRunner = new JobRunner(scheduler, job);
+      executor.execute(jobRunner);
+
+      // wait until it is submitted to the remote
+      while (!jobRunner.isJobSubmittedInRemote()) {
+        synchronized (queue) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+          }
+        }
       }
     }
   }
@@ -70,12 +80,24 @@ public class RemoteScheduler implements Scheduler {
 
   @Override
   public Collection<Job> getJobsWaiting() {
-    return null;
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : queue) {
+        ret.add(job);
+      }
+    }
+    return ret;
   }
 
   @Override
   public Collection<Job> getJobsRunning() {
-    return null;
+    List<Job> ret = new LinkedList<Job>();
+    synchronized (queue) {
+      for (Job job : running) {
+        ret.add(job);
+      }
+    }
+    return ret;
   }
 
   @Override
@@ -96,8 +118,8 @@ public class RemoteScheduler implements Scheduler {
   }
 
   /**
-   * Role of the class is get status info from remote process
-   * from PENDING to RUNNING status.
+   * Role of the class is get status info from remote process from PENDING to
+   * RUNNING status.
    */
   private class JobStatusPoller extends Thread {
     private long initialPeriodMsec;
@@ -106,14 +128,11 @@ public class RemoteScheduler implements Scheduler {
     private boolean terminate;
     private JobListener listener;
     private Job job;
+    Status lastStatus;
 
-    public JobStatusPoller(
-        long initialPeriodMsec,
-        long initialPeriodCheckIntervalMsec,
-        long checkIntervalMsec,
-        Job job,
-        JobListener listener
-    ) {
+    public JobStatusPoller(long initialPeriodMsec,
+        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
+        JobListener listener) {
       this.initialPeriodMsec = initialPeriodMsec;
       this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
       this.checkIntervalMsec = checkIntervalMsec;
@@ -141,19 +160,13 @@ public class RemoteScheduler implements Scheduler {
           }
         }
 
+
         Status newStatus = getStatus();
-        if (newStatus == null) {
+        if (newStatus == null) { // unknown
           continue;
         }
 
-        // update only RUNNING
-        if (newStatus == Status.RUNNING) {
-          listener.afterStatusChange(job, null, newStatus);
-          break;
-        }
-
-        if (newStatus != Status.READY &&
-            newStatus != Status.PENDING) {
+        if (newStatus != Status.READY && newStatus != Status.PENDING) {
           // we don't need more
           continue;
         }
@@ -167,9 +180,24 @@ public class RemoteScheduler implements Scheduler {
       }
     }
 
+
+    private Status getLastStatus() {
+      if (terminate == true) {
+        if (lastStatus != Status.FINISHED &&
+            lastStatus != Status.ERROR &&
+            lastStatus != Status.ABORT) {
+          return Status.FINISHED;
+        } else {
+          return (lastStatus == null) ? Status.FINISHED : lastStatus;
+        }
+      } else {
+        return (lastStatus == null) ? Status.FINISHED : lastStatus;
+      }
+    }
+
     public synchronized Job.Status getStatus() {
       if (interpreterProcess.referenceCount() <= 0) {
-        return null;
+        return getLastStatus();
       }
 
       Client client;
@@ -177,33 +205,52 @@ public class RemoteScheduler implements Scheduler {
         client = interpreterProcess.getClient();
       } catch (Exception e) {
         logger.error("Can't get status information", e);
-        return Status.FINISHED;
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
       }
 
       try {
-        Status status = Status.valueOf(client.getStatus(job.getId()));
-        logger.info("getStatus from remote {}", status);
+        String statusStr = client.getStatus(job.getId());
+        if ("Unknown".equals(statusStr)) {
+          // not found this job in the remote schedulers.
+          // maybe not submitted, maybe already finished
+          Status status = getLastStatus();
+          listener.afterStatusChange(job, null, status);
+          return status;
+        }
+        Status status = Status.valueOf(statusStr);
+        lastStatus = status;
+        listener.afterStatusChange(job, null, status);
         return status;
       } catch (TException e) {
         logger.error("Can't get status information", e);
-        return Status.FINISHED;
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
       } catch (Exception e) {
-        // unknown status
-        return Status.FINISHED;
+        logger.error("Unknown status", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
       } finally {
         interpreterProcess.releaseClient(client);
       }
     }
   }
 
-
   private class JobRunner implements Runnable, JobListener {
     private Scheduler scheduler;
     private Job job;
+    private boolean jobExecuted;
+    boolean jobSubmittedRemotely;
 
     public JobRunner(Scheduler scheduler, Job job) {
       this.scheduler = scheduler;
       this.job = job;
+      jobExecuted = false;
+      jobSubmittedRemotely = false;
+    }
+
+    public boolean isJobSubmittedInRemote() {
+      return jobSubmittedRemotely;
     }
 
     @Override
@@ -220,26 +267,25 @@ public class RemoteScheduler implements Scheduler {
         return;
       }
 
-
-      JobStatusPoller jobStatusPoller = new JobStatusPoller(
-          1500,
-          100,
-          500,
-          job,
-          this
-      );
-      logger.info("*********** Start job status poller");
+      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
+          job, this);
       jobStatusPoller.start();
 
       if (listener != null) {
         listener.jobStarted(scheduler, job);
       }
       job.run();
+      jobExecuted = true;
+      jobSubmittedRemotely = true;
 
       jobStatusPoller.shutdown();
+      try {
+        jobStatusPoller.join();
+      } catch (InterruptedException e) {
+        logger.error("JobStatusPoller interrupted", e);
+      }
 
       job.setStatus(jobStatusPoller.getStatus());
-
       if (listener != null) {
         listener.jobFinished(scheduler, job);
       }
@@ -263,16 +309,42 @@ public class RemoteScheduler implements Scheduler {
 
     @Override
     public void afterStatusChange(Job job, Status before, Status after) {
-      // status polled by status poller
-      if (job.getStatus() == after) {
+      if (after == null) { // unknown. maybe before sumitted remotely, maybe 
already finished.
+        if (jobExecuted) {
+          jobSubmittedRemotely = true;
+          if (job.isAborted()) {
+            job.setStatus(Status.ABORT);
+          } else if (job.getException() != null) {
+            job.setStatus(Status.ERROR);
+          } else {
+            job.setStatus(Status.FINISHED);
+          }
+        }
         return;
       }
 
-      job.setStatus(after);
+
+      // Update remoteStatus
+      if (jobExecuted == false) {
+        if (after == Status.FINISHED || after == Status.ABORT
+            || after == Status.ERROR) {
+          // it can be status of last run.
+          // so not updating the remoteStatus
+          return;
+        } else if (after == Status.RUNNING) {
+          jobSubmittedRemotely = true;
+        }
+      } else {
+        jobSubmittedRemotely = true;
+      }
+
+      // status polled by status poller
+      if (job.getStatus() != after) {
+        job.setStatus(after);
+      }
     }
   }
 
-
   @Override
   public void stop() {
     terminate = true;
@@ -280,7 +352,6 @@ public class RemoteScheduler implements Scheduler {
       queue.notify();
     }
 
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 5ec6a32..dcee6aa 100644
--- 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -7,6 +7,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -18,26 +20,35 @@ import org.junit.Test;
 import com.nflabs.zeppelin.display.GUI;
 import com.nflabs.zeppelin.interpreter.InterpreterContext;
 import com.nflabs.zeppelin.interpreter.InterpreterGroup;
+import com.nflabs.zeppelin.interpreter.InterpreterResult;
 import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import com.nflabs.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import com.nflabs.zeppelin.scheduler.Job;
+import com.nflabs.zeppelin.scheduler.Job.Status;
+import com.nflabs.zeppelin.scheduler.Scheduler;
 
 public class RemoteInterpreterTest {
 
 
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
   @Before
   public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
   }
 
   @After
   public void tearDown() throws Exception {
+    intpGroup.clone();
+    intpGroup.destroy();
   }
 
   @Test
   public void testRemoteInterperterCall() throws TTransportException, 
IOException {
     Properties p = new Properties();
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<String, String>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
 
     RemoteInterpreter intpA = new RemoteInterpreter(
         p,
@@ -97,9 +108,6 @@ public class RemoteInterpreterTest {
   @Test
   public void testRemoteSchedulerSharing() throws TTransportException, 
IOException {
     Properties p = new Properties();
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<String, String>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
 
     RemoteInterpreter intpA = new RemoteInterpreter(
         p,
@@ -127,30 +135,294 @@ public class RemoteInterpreterTest {
     intpB.open();
 
     long start = System.currentTimeMillis();
-    intpA.interpret("500",
+    InterpreterResult ret = intpA.interpret("500",
         new InterpreterContext(
             "id",
             "title",
             "text",
             new HashMap<String, Object>(),
             new GUI()));
+    assertEquals("500", ret.message());
 
-    intpB.interpret("500",
+    ret = intpB.interpret("500",
         new InterpreterContext(
             "id",
             "title",
             "text",
             new HashMap<String, Object>(),
             new GUI()));
+    assertEquals("1000", ret.message());
+    long end = System.currentTimeMillis();
+    assertTrue(end - start >= 1000);
+
+
+    intpA.close();
+    intpB.close();
+
+    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
+    assertFalse(process.isRunning());
+  }
+
+  @Test
+  public void testRemoteSchedulerSharingSubmit() throws TTransportException, 
IOException, InterruptedException {
+    Properties p = new Properties();
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    final RemoteInterpreter intpB = new RemoteInterpreter(
+        p,
+        MockInterpreterB.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env
+        );
+
+    intpGroup.add(intpB);
+    intpB.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+    intpB.open();
+
+    long start = System.currentTimeMillis();
+    Job jobA = new Job("jobA", null) {
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpA.interpret("500",
+            new InterpreterContext(
+                "jobA",
+                "title",
+                "text",
+                new HashMap<String, Object>(),
+                new GUI()));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpA.getScheduler().submit(jobA);
+
+    Job jobB = new Job("jobB", null) {
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        return intpB.interpret("500",
+            new InterpreterContext(
+                "jobB",
+                "title",
+                "text",
+                new HashMap<String, Object>(),
+                new GUI()));
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+    };
+    intpB.getScheduler().submit(jobB);
+
+    // wait until both job finished
+    while (jobA.getStatus() != Status.FINISHED ||
+           jobB.getStatus() != Status.FINISHED) {
+      Thread.sleep(100);
+    }
+
     long end = System.currentTimeMillis();
     assertTrue(end - start >= 1000);
 
+    assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message());
 
     intpA.close();
     intpB.close();
 
     RemoteInterpreterProcess process = intpA.getInterpreterProcess();
     assertFalse(process.isRunning());
+  }
+
+  @Test
+  public void testRunOrderPreserved() throws InterruptedException {
+    Properties p = new Properties();
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env
+        );
 
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 3;
+    final List<String> results = new LinkedList<String>();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          InterpreterResult ret = intpA.interpret(getJobName(), new 
InterpreterContext(
+              jobId,
+              "title",
+              "text",
+              new HashMap<String, Object>(),
+              new GUI()));
+
+          synchronized (results) {
+            results.add(ret.message());
+            results.notify();
+          }
+          return null;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    int i = 0;
+    for (String result : results) {
+      assertEquals(Integer.toString(i++), result);
+    }
+    assertEquals(concurrency, i);
+
+    intpA.close();
+  }
+
+
+  @Test
+  public void testRunParallel() throws InterruptedException {
+    Properties p = new Properties();
+    p.put("parallel", "true");
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        MockInterpreterA.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        env
+        );
+
+    intpGroup.add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    int concurrency = 4;
+    final int timeToSleep = 1000;
+    final List<String> results = new LinkedList<String>();
+    long start = System.currentTimeMillis();
+
+    Scheduler scheduler = intpA.getScheduler();
+    for (int i = 0; i < concurrency; i++) {
+      final String jobId = Integer.toString(i);
+      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
+
+        @Override
+        public int progress() {
+          return 0;
+        }
+
+        @Override
+        public Map<String, Object> info() {
+          return null;
+        }
+
+        @Override
+        protected Object jobRun() throws Throwable {
+          String stmt = Integer.toString(timeToSleep);
+          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
+              jobId,
+              "title",
+              "text",
+              new HashMap<String, Object>(),
+              new GUI()));
+
+          synchronized (results) {
+            results.add(ret.message());
+            results.notify();
+          }
+          return stmt;
+        }
+
+        @Override
+        protected boolean jobAbort() {
+          return false;
+        }
+
+      });
+    }
+
+    // wait for job finished
+    synchronized (results) {
+      while (results.size() != concurrency) {
+        results.wait(300);
+      }
+    }
+
+    long end = System.currentTimeMillis();
+
+    assertTrue(end - start < timeToSleep * concurrency);
+
+    intpA.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
index 10b6ec9..1df3979 100644
--- 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterA.java
@@ -22,23 +22,31 @@ public class MockInterpreterA extends Interpreter {
             .add("p1", "v1", "property1").build());
 
   }
+
+  private String lastSt;
+
   public MockInterpreterA(Properties property) {
     super(property);
   }
 
   @Override
   public void open() {
-
+    //new RuntimeException().printStackTrace();
   }
 
   @Override
   public void close() {
   }
 
+  public String getLastStatement() {
+    return lastSt;
+  }
+
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
     try {
       Thread.sleep(Long.parseLong(st));
+      this.lastSt = st;
     } catch (NumberFormatException | InterruptedException e) {
       throw new InterpreterException(e);
     }
@@ -67,6 +75,10 @@ public class MockInterpreterA extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+    if (getProperty("parallel") != null && 
getProperty("parallel").equals("true")) {
+      return 
SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + 
this.hashCode(), 10);
+    } else {
+      return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
index d8b2ce9..39f2ab8 100644
--- 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -10,6 +10,7 @@ import com.nflabs.zeppelin.interpreter.InterpreterGroup;
 import com.nflabs.zeppelin.interpreter.InterpreterPropertyBuilder;
 import com.nflabs.zeppelin.interpreter.InterpreterResult;
 import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;
+import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
 import com.nflabs.zeppelin.scheduler.Scheduler;
 
 public class MockInterpreterB extends Interpreter {
@@ -28,7 +29,7 @@ public class MockInterpreterB extends Interpreter {
 
   @Override
   public void open() {
-
+    //new RuntimeException().printStackTrace();
   }
 
   @Override
@@ -37,12 +38,18 @@ public class MockInterpreterB extends Interpreter {
 
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
+    MockInterpreterA intpA = getInterpreterA();
+    String intpASt = intpA.getLastStatement();
+    long timeToSleep = Long.parseLong(st);
+    if (intpASt != null) {
+      timeToSleep += Long.parseLong(intpASt);
+    }
     try {
-      Thread.sleep(Long.parseLong(st));
+      Thread.sleep(timeToSleep);
     } catch (NumberFormatException | InterruptedException e) {
       throw new InterpreterException(e);
     }
-    return new InterpreterResult(Code.SUCCESS, st);
+    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
   }
 
   @Override
@@ -65,6 +72,20 @@ public class MockInterpreterB extends Interpreter {
     return null;
   }
 
+  public MockInterpreterA getInterpreterA() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    for (Interpreter intp : interpreterGroup) {
+      if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        return (MockInterpreterA) p;
+      }
+    }
+    return null;
+  }
+
   @Override
   public Scheduler getScheduler() {
     InterpreterGroup interpreterGroup = getInterpreterGroup();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ff85f793/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
index 8695038..35aa1d3 100644
--- 
a/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/com/nflabs/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -1,5 +1,7 @@
 package com.nflabs.zeppelin.scheduler;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
@@ -53,7 +55,7 @@ public class RemoteSchedulerTest {
         intpA.getInterpreterProcess(),
         10);
 
-    scheduler.submit(new Job("jobName", null) {
+    Job job = new Job("jobId", "jobName", null, 200) {
 
       @Override
       public int progress() {
@@ -67,21 +69,34 @@ public class RemoteSchedulerTest {
 
       @Override
       protected Object jobRun() throws Throwable {
-        intpA.interpret("500", new InterpreterContext(
-            "id",
+        intpA.interpret("1000", new InterpreterContext(
+            "jobId",
             "title",
             "text",
             new HashMap<String, Object>(),
             new GUI()));
-        return "500";
+        return "1000";
       }
 
       @Override
       protected boolean jobAbort() {
         return false;
       }
+    };
+    scheduler.submit(job);
+
+    while (job.isRunning() == false) {
+      Thread.sleep(100);
+    }
+
+    Thread.sleep(500);
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(1, scheduler.getJobsRunning().size());
+
+    Thread.sleep(500);
 
-    });
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(0, scheduler.getJobsRunning().size());
 
     intpA.close();
     schedulerSvc.removeScheduler("test");

Reply via email to