Repository: incubator-zeppelin Updated Branches: refs/heads/master fa5057cfd -> 0665cef05
ZEPPELIN-424 Cancel paragraph in pending status https://issues.apache.org/jira/browse/ZEPPELIN-424 Cancel paragraph in pending status. By removing job from waiting queue and set status ABORT. Author: Lee moon soo <[email protected]> Closes #454 from Leemoonsoo/ZEPPELIN-424 and squashes the following commits: b005129 [Lee moon soo] Keep previous result on ABORT in PENDING status 9d5056a [Lee moon soo] Allow job abort in PENDING status ad26ac4 [Lee moon soo] Add unittest for abort in PENDING status Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/0665cef0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/0665cef0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/0665cef0 Branch: refs/heads/master Commit: 0665cef059b6444381eefc6a5870ca510607ce11 Parents: fa5057c Author: Lee moon soo <[email protected]> Authored: Sun Nov 22 09:20:47 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Wed Nov 25 02:54:29 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/interpreter/Interpreter.java | 5 +- .../zeppelin/interpreter/InterpreterResult.java | 18 +-- .../interpreter/remote/RemoteInterpreter.java | 10 +- .../remote/RemoteInterpreterServer.java | 19 ++- .../zeppelin/scheduler/FIFOScheduler.java | 38 ++++-- .../zeppelin/scheduler/ParallelScheduler.java | 35 +++-- .../zeppelin/scheduler/RemoteScheduler.java | 18 ++- .../apache/zeppelin/scheduler/Scheduler.java | 7 +- .../zeppelin/scheduler/FIFOSchedulerTest.java | 25 +++- .../zeppelin/scheduler/RemoteSchedulerTest.java | 134 ++++++++++++++++++- .../org/apache/zeppelin/notebook/Paragraph.java | 13 +- 11 files changed, 271 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 3f3503c..d9bb0bf 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -121,7 +121,10 @@ public abstract class Interpreter { * Called when interpreter is no longer used. */ public void destroy() { - getScheduler().stop(); + Scheduler scheduler = getScheduler(); + if (scheduler != null) { + scheduler.stop(); + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java index 20317eb..593cfc7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -23,29 +23,21 @@ import java.util.*; /** * Interpreter result template. - * - * @author Leemoonsoo - * */ public class InterpreterResult implements Serializable { /** * Type of result after code execution. - * - * @author Leemoonsoo - * */ public static enum Code { SUCCESS, INCOMPLETE, - ERROR + ERROR, + KEEP_PREVIOUS_RESULT } /** * Type of Data. - * - * @author Leemoonsoo - * */ public static enum Type { TEXT, @@ -99,7 +91,7 @@ public class InterpreterResult implements Serializable { int magicLength = lastType.getValue().name().length() + 1; // 1 for the last \n or space after magic int subStringPos = magicLength + lastType.getKey() + 1; - return msg.substring(subStringPos); + return msg.substring(subStringPos); } } @@ -116,7 +108,7 @@ public class InterpreterResult implements Serializable { return lastType.getValue(); } } - + private int getIndexOfType(String msg, Type t) { if (msg == null) { return 0; @@ -124,7 +116,7 @@ public class InterpreterResult implements Serializable { String typeString = "%" + t.name().toLowerCase(); return StringUtils.indexOf(msg, typeString ); } - + private TreeMap<Integer, Type> buildIndexMap(String msg) { int lastIndexOftypes = 0; TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<Integer, Type>(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 9d01561..ef1f115 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -329,9 +329,13 @@ public class RemoteInterpreter extends Interpreter { public Scheduler getScheduler() { int maxConcurrency = 10; RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(), - maxConcurrency); + if (interpreterProcess == null) { + return null; + } else { + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(), + maxConcurrency); + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 7405a66..d6768c9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -37,7 +37,6 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.ClassloaderInterpreter; import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; @@ -62,7 +61,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; /** - * + * Entry point for Interpreter process. + * Accepting thrift connections from ZeppelinServer. */ public class RemoteInterpreterServer extends Thread @@ -233,6 +233,11 @@ public class RemoteInterpreterServer result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException())); } else { result = (InterpreterResult) job.getReturn(); + + // in case of job abort in PENDING status, result can be null + if (result == null) { + result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT); + } } return convert(result, context.getConfig(), @@ -303,8 +308,16 @@ public class RemoteInterpreterServer @Override public void cancel(String className, RemoteInterpreterContext interpreterContext) throws TException { + logger.info("cancel {} {}", className, interpreterContext.getParagraphId()); Interpreter intp = getInterpreter(className); - intp.cancel(convert(interpreterContext)); + String jobId = interpreterContext.getParagraphId(); + Job job = intp.getScheduler().removeFromWaitingQueue(jobId); + + if (job != null) { + job.setStatus(Status.ABORT); + } else { + intp.cancel(convert(interpreterContext)); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index e7f950a..11b5618 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.scheduler; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService; import org.apache.zeppelin.scheduler.Job.Status; /** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * + * FIFOScheduler runs submitted job sequentially */ public class FIFOScheduler implements Scheduler { List<Job> queue = new LinkedList<Job>(); @@ -83,20 +81,38 @@ public class FIFOScheduler implements Scheduler { } } + + @Override + public Job removeFromWaitingQueue(String jobId) { + synchronized (queue) { + Iterator<Job> it = queue.iterator(); + while (it.hasNext()) { + Job job = it.next(); + if (job.getId().equals(jobId)) { + it.remove(); + return job; + } + } + } + return null; + } + @Override public void run() { synchronized (queue) { while (terminate == false) { - if (runningJob != null || queue.isEmpty() == true) { - try { - queue.wait(500); - } catch (InterruptedException e) { + synchronized (queue) { + if (runningJob != null || queue.isEmpty() == true) { + try { + queue.wait(500); + } catch (InterruptedException e) { + } + continue; } - continue; - } - runningJob = queue.remove(0); + runningJob = queue.remove(0); + } final Scheduler scheduler = this; this.executor.execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index c8e8e04..8507861 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.scheduler; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -25,10 +26,7 @@ import java.util.concurrent.ExecutorService; import org.apache.zeppelin.scheduler.Job.Status; /** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * + * Parallel scheduler runs submitted job concurrently. */ public class ParallelScheduler implements Scheduler { List<Job> queue = new LinkedList<Job>(); @@ -64,6 +62,21 @@ public class ParallelScheduler implements Scheduler { } @Override + public Job removeFromWaitingQueue(String jobId) { + synchronized (queue) { + Iterator<Job> it = queue.iterator(); + while (it.hasNext()) { + Job job = it.next(); + if (job.getId().equals(jobId)) { + it.remove(); + return job; + } + } + } + return null; + } + + @Override public Collection<Job> getJobsRunning() { List<Job> ret = new LinkedList<Job>(); synchronized (queue) { @@ -87,9 +100,9 @@ public class ParallelScheduler implements Scheduler { @Override public void run() { - - synchronized (queue) { - while (terminate == false) { + while (terminate == false) { + Job job = null; + synchronized (queue) { if (running.size() >= maxConcurrency || queue.isEmpty() == true) { try { queue.wait(500); @@ -98,14 +111,12 @@ public class ParallelScheduler implements Scheduler { continue; } - Job job = queue.remove(0); + job = queue.remove(0); running.add(job); - Scheduler scheduler = this; - - executor.execute(new JobRunner(scheduler, job)); } + Scheduler scheduler = this; - + executor.execute(new JobRunner(scheduler, job)); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index ec5fcd4..51dab12 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.scheduler; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -32,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter */ public class RemoteScheduler implements Scheduler { Logger logger = LoggerFactory.getLogger(RemoteScheduler.class); @@ -108,6 +109,21 @@ public class RemoteScheduler implements Scheduler { } @Override + public Job removeFromWaitingQueue(String jobId) { + synchronized (queue) { + Iterator<Job> it = queue.iterator(); + while (it.hasNext()) { + Job job = it.next(); + if (job.getId().equals(jobId)) { + it.remove(); + return job; + } + } + } + return null; + } + + @Override public Collection<Job> getJobsRunning() { List<Job> ret = new LinkedList<Job>(); synchronized (queue) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java index a886c22..90d4397 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java @@ -20,10 +20,7 @@ package org.apache.zeppelin.scheduler; import java.util.Collection; /** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * + * Interface for scheduler */ public interface Scheduler extends Runnable { public String getName(); @@ -34,5 +31,7 @@ public interface Scheduler extends Runnable { public void submit(Job job); + public Job removeFromWaitingQueue(String jobId); + public void stop(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java index 3d8495c..7288b67 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java @@ -88,7 +88,30 @@ public class FIFOSchedulerTest extends TestCase { assertTrue((500 > (Long)job1.getReturn())); assertEquals(null, job2.getReturn()); + } + public void testRemoveFromWaitingQueue() throws InterruptedException{ + Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test"); + assertEquals(0, s.getJobsRunning().size()); + assertEquals(0, s.getJobsWaiting().size()); - } + Job job1 = new SleepingJob("job1", null, 500); + Job job2 = new SleepingJob("job2", null, 500); + + s.submit(job1); + s.submit(job2); + + Thread.sleep(200); + + job1.abort(); + job2.abort(); + + Thread.sleep(200); + + assertEquals(Status.ABORT, job1.getStatus()); + assertEquals(Status.ABORT, job2.getStatus()); + + assertTrue((500 > (Long)job1.getReturn())); + assertEquals(null, job2.getReturn()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 08fe190..d17df4f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.scheduler; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; @@ -33,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.scheduler.Job.Status; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -127,7 +130,7 @@ public class RemoteSchedulerTest { Thread.sleep(TICK_WAIT); cycles++; } - + assertTrue(job.isTerminated()); assertEquals(0, scheduler.getJobsWaiting().size()); assertEquals(0, scheduler.getJobsRunning().size()); @@ -136,4 +139,133 @@ public class RemoteSchedulerTest { schedulerSvc.removeScheduler("test"); } + @Test + public void testAbortOnPending() throws Exception { + Properties p = new Properties(); + final InterpreterGroup intpGroup = new InterpreterGroup(); + Map<String, String> env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", + intpA.getInterpreterProcess(), + 10); + + Job job1 = new Job("jobId1", "jobName1", null, 200) { + InterpreterContext context = new InterpreterContext( + "note", + "jobId1", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + }; + + Job job2 = new Job("jobId2", "jobName2", null, 200) { + InterpreterContext context = new InterpreterContext( + "note", + "jobId2", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + }; + + job2.setResult("result2"); + + scheduler.submit(job1); + scheduler.submit(job2); + + + int cycles = 0; + while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job1.isRunning()); + assertTrue(job2.getStatus() == Status.PENDING); + + job2.abort(); + + cycles = 0; + while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertNotNull(job1.getDateFinished()); + assertTrue(job1.isTerminated()); + assertNull(job2.getDateFinished()); + assertTrue(job2.isTerminated()); + assertEquals("result2", job2.getReturn()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/0665cef0/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 1332f16..28c49c6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -22,6 +22,8 @@ import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; import org.slf4j.Logger; @@ -205,13 +207,22 @@ public class Paragraph extends Job implements Serializable, Cloneable { } logger().debug("RUN : " + script); InterpreterResult ret = repl.interpret(script, getInterpreterContext()); + + if (Code.KEEP_PREVIOUS_RESULT == ret.code()) { + return getReturn(); + } return ret; } @Override protected boolean jobAbort() { Interpreter repl = getRepl(getRequiredReplName()); - repl.cancel(getInterpreterContext()); + Job job = repl.getScheduler().removeFromWaitingQueue(getId()); + if (job != null) { + job.setStatus(Status.ABORT); + } else { + repl.cancel(getInterpreterContext()); + } return true; }
