I have been tinkering on this here and there and now have thread interruption working for TinkerGraphComputer and SparkGraphComputer.
https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946 The tests for interruption unfortunately needed to be different - the same testing mechanism didn't hold between OLAP and OLTP, but I got roughly the same coverage. I did have to OptOut the tests for spark though. I don't have a reliable way to block for interruption at the right time so the tests don't really test what they are supposed to (and typically just fail). I could probably do it with a Thread.sleep() but i sense that will just make the test randomly fail in different environments (like Travis or whatever). The tests do work perfectly for TinkerGraphComputer though. Even though I don't have unit tests, I was able to manually test SparkGraphComputer. I started a spark cluster, submitted my job in the console in a thread and then killed the thread once i saw the job show up in the spark web ui - the output is a little ugly but check this out: gremlin> t = new Thread({ gremlin> try { gremlin> println g.V().out().out().toList() gremlin> } catch (Exception ex) { ex.printStackTrace() }}) ==>Thread[Thread-91,5,main] gremlin> t.start() ==>null gremlin> t.interrupt() ==>null gremlin> org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException at org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.VertexProgramStep.processNextStart(VertexProgramStep.java:80) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143) at org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.next(ExpandableStepIterator.java:50) at org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep.processNextStart(ComputerResultStep.java:70) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:128) at org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:38) at org.apache.tinkerpop.gremlin.process.traversal.Traversal.fill(Traversal.java:146) at org.apache.tinkerpop.gremlin.process.traversal.Traversal.toList(Traversal.java:103) at org.apache.tinkerpop.gremlin.process.traversal.Traversal$toList.call(Unknown Source) at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113) at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117) at groovysh_evaluate$_run_closure1.doCall(groovysh_evaluate:5) at groovysh_evaluate$_run_closure1.doCall(groovysh_evaluate) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93) at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325) at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:294) at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1021) at groovy.lang.Closure.call(Closure.java:426) at groovy.lang.Closure.call(Closure.java:420) at groovy.lang.Closure.run(Closure.java:507) at java.lang.Thread.run(Thread.java:745) We get the TraversalInterruptedException as expected and best of all I watched the running job status in Spark web ui go to "KILLED" - so the local interruption in the console cleaned up the job on the spark cluster! That little bit of magic happens here: https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946#diff-91e2c8dbe3d9312fea7aec8291c953a8R251 Marko, perhaps you can suggest some other places in that spark code that might have long run iterations where we might want to check for thread interruption. I only added that one (but maybe that's the only place we want interrupt to occur - i don't know the code well enough to say). Another thing I'd point out, is that the GraphComputer instances now use a single thread executor to supply the Future<ComputerResult> from submit() (instead of CompletableFuture.supplyAsync() which used ForkJoinPool): https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946#diff-7598ae12d354f03e698cb5319371fff1R75 That was important as use of ForkJoinPool doesn't seem to create futures that respect cancellation. What else needs to happen on this ticket? I guess GiraphGraphComputer needs similar capability somehow? can giraph jobs be cancelled? I do think that for OLAP, interruption can be a best effort kind of thing. It may not always be easy to support this capability in all cases. On Wed, Apr 20, 2016 at 8:03 AM, Marko Rodriguez <okramma...@gmail.com> wrote: > Dope. > > Wanna see if TinkerGraphComputer and SparkGraphComputer "just work" (i.e. > "just fail" :) ? > > Marko. > > http://markorodriguez.com > > On Apr 20, 2016, at 5:59 AM, Stephen Mallette <spmalle...@gmail.com> > wrote: > > > I refactored a bit and built parameterized tests, so we have better > > coverage over the different variations in the steps and can easily add > new > > ones: > > > > > https://github.com/apache/incubator-tinkerpop/blob/7a9f7de6ebed5d4293708524c772db0f0b2c3bac/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java > > > > I've run the tests for TinkerGraph and Neo4j - both pass nicely. > > > > On Tue, Apr 19, 2016 at 7:13 PM, Stephen Mallette <spmalle...@gmail.com> > > wrote: > > > >>> It can be done, but would require the provider implementations to > >> change as they would need to message a "job kill." > >>> Again, one thread is interrupted, but what about all the other threads… > >> Its not that its "not possible," just that we will have design around > this > >> like for OLAP above. > >> > >> Agreed - should be possible - just something to think about as we move > >> forward. > >> > >>> We should also add test cases to VertexStep, PropertyStep, GraphStep, > >> etc. that use interrupt > >> > >> I added some more test cases to cover proper interrupt semantics for > >> implementations. It took some thinking to come up with a way to test > steps > >> other than GraphStep though. To test, you have to start iteration of a > >> Traversal in one thread and interrupt it in another. It's a bit > imperfect > >> because it's not immediately obvious how to ensure that my call to > >> interrupt the thread will trigger a TraversalInterruptedException in a > >> specific step. In other words, i can test: > >> > >> g.V() > >> > >> because that will use some variation of GraphStep and that's all there > is, > >> but when you have: > >> > >> g.V().out() > >> > >> I won't know for sure if if the test passes because the thread may have > >> interrupted in GraphStep or VertexStep. Anyway, I came up with a way to > do > >> it with what I think is a clever use of sideEffect() to block at the > right > >> point and hold the traversal to force it to fail on the right step. You > >> can see that work here: > >> > >> > >> > https://github.com/apache/incubator-tinkerpop/commit/fd16fabd7595470bd33f30b882b1f0297d05b55a > >> > >> I covered VertexStep, PropertyStep, GraphStep as you suggested. I didn't > >> do a lot of variations on them (e.g. didn't do g.E()). Any thoughts on > how > >> much coverage is "right" for this? > >> > >> as a side note, I was going to retarget the branch at tp31 but I'm > >> starting to feel like this change is sufficiently big a feature that it > >> should probably exist on the 3.2.x line where it can live with the > >> benchmarks. > >> > >> On Mon, Apr 18, 2016 at 12:06 PM, Marko Rodriguez <okramma...@gmail.com > > > >> wrote: > >> > >>> Hi, > >>> > >>>> Well - I don't think this code is highly specialized. It's good > general > >>>> practice to respect Thread.interrupted(). I think you'd find that > >>> sentiment > >>>> in just about any java concurrency programming book. … > >>> > >>> Huh, I didn't know this was a "standard pattern." If so, cool. Then > that > >>> solves that. > >>> > >>>>> 1. In OLAP, where there can be multiple threads how does this work? > >>>>> 2. In Giraph/Spark, how does this effect job execution and failure > >>>> responses? > >>>> > >>>> I ended my initial post in this thread by deleting the last paragraph > i > >>>> wrote about OLAP. :) I guess there's still some question there as to > >>> how > >>>> that will work. If I interrupt the thread that was executing the OLAP > >>>> traversal, it's only going to kill it waiting for the result from > Spark > >>> or > >>>> wherever. The traversal will still be executing in the context of > >>> spark. > >>>> I assume the way to deal with this is on an implementation specific > >>> basis > >>>> where I assume there is a way to cancel a running spark job (or > running > >>>> giraph job or whatever). If the Traversal that waits for interrupt > could > >>>> signal that cancellation somehow, i guess that would be the way to > >>>> implement that. I don't know enough about the specifics of spark for > how > >>>> that would work but it sounds plausible, no? > >>> > >>> Yea, I don't know how this would work either as it would be > master/slave > >>> traversals needing to coordinate. It can be done, but would require the > >>> provider implementations to change as they would need to message a "job > >>> kill." We could add test cases to GraphComputerTest that ensure that > all > >>> OLAP engines handle such interrupts correctly. We should also add test > >>> cases to VertexStep, PropertyStep, GraphStep, etc. that use interrupt > as > >>> these are the steps that most providers will implement/extend and we > need > >>> to ensure they are doing the interruptions correctly. > >>> > >>>>> 3. When we move into threaded OLTP, how will this be > >>> triggered/effected? > >>>> > >>>> I'm not sure how that feature will be implemented - so i'm not sure > how > >>> to > >>>> comment on that. > >>> > >>> It would be similar to OLAP, where you have a master traversal and > >>> slave/parallel traversals. Again, one thread is interrupted, but what > about > >>> all the other threads… Its not that its "not possible," just that we > will > >>> have design around this like for OLAP above. > >>> > >>> Marko. > >>> > >>> > >>>> On Mon, Apr 18, 2016 at 11:28 AM, Marko Rodriguez < > okramma...@gmail.com > >>>> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> I think a problem with this is that it requires every step > >>> implementation > >>>>> to have this construct in it -- though many steps simply extend the > >>> base > >>>>> FlatMapStep, MapStep, FilterStep, etc. However, not all and thus, > this > >>>>> requires all providers to know what this about and write their code > >>>>> accordingly. > >>>>> > >>>>> A few questions: > >>>>> > >>>>> 1. In OLAP, where there can be multiple threads how does this > >>> work? > >>>>> 2. In Giraph/Spark, how does this effect job execution and > >>> failure > >>>>> responses? > >>>>> 3. When we move into threaded OLTP, how will this be > >>>>> triggered/effected? > >>>>> 4. This doesn't work for "infinite loop" lambdas or "hung > >>>>> databases." > >>>>> > >>>>> I know this is the oldest ticket in the books and a million solutions > >>> have > >>>>> been proposed, but it would be nice if this didn't require > specialized > >>> code > >>>>> in all the steps. We are bound to "forget." > >>>>> > >>>>> Thanks, > >>>>> Marko. > >>>>> > >>>>> http://markorodriguez.com > >>>>> > >>>>> On Apr 18, 2016, at 8:56 AM, Stephen Mallette <spmalle...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Do you mean: > >>>>>> > >>>>>> if (Thread.currentThread().isInterrupted()) throw new > >>>>>> TraversalInterruptedException(); > >>>>>> > >>>>>> If so, Thread.interrupted() basically does that under the covers > >>>>>> > >>>>>> On Mon, Apr 18, 2016 at 10:51 AM, Ted Wilmes <twil...@gmail.com> > >>> wrote: > >>>>>> > >>>>>>> Yeah, looks like benchmark-wise it's a wash, which is good. I > wasn't > >>>>> aware > >>>>>>> of the difference between the static interrupted() and non-static > >>>>>>> isInterrupted(). I was wondering if in this case it should be > >>>>>>> isInterrupted(), but I think how you did it is good because it'll > be > >>>>>>> evaluated within the traversal thread regardless. > >>>>>>> > >>>>>>> --Ted > >>>>>>> > >>>>>>> On Mon, Apr 18, 2016 at 6:11 AM, Stephen Mallette < > >>> spmalle...@gmail.com > >>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> A while back, I brought up the issue of being able to interrupt > >>>>>>> traversals: > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://pony-poc.apache.org/thread.html/e6477fc9c58d37a5bdcb5938a0eaa285456ad15aa39e16446290e2ff@1444993523@%3Cdev.tinkerpop.apache.org%3E > >>>>>>>> https://issues.apache.org/jira/browse/TINKERPOP-946 > >>>>>>>> > >>>>>>>> As a quick refresher, making Traversal respect > Thread.interrupted() > >>> is > >>>>>>>> important as you otherwise can quite easily lock up applications > >>> like > >>>>>>>> Gremlin Server with a few poorly conceived or errant queries. We'd > >>> left > >>>>>>>> that last thread with liking the idea, but there were concerns > about > >>>>> the > >>>>>>>> complexity of the changes and performance hits. > >>>>>>>> > >>>>>>>> Given that we now have gremlin-benchmark, I decided to see what > the > >>>>>>>> performance hit would be for making this change. I took a rough > >>> stab at > >>>>>>> it > >>>>>>>> introducing Thread.interrupted() in all steps where it seemed to > >>> make > >>>>>>> sense > >>>>>>>> to do so and then ran the benchmark before and after the change. > >>>>>>>> > >>>>>>>> > https://gist.github.com/spmallette/ed21267f2e7e17bb3fbd5a8d1a568d2b > >>>>>>>> > >>>>>>>> I'm not seeing a whole lot of difference between supporting this > >>>>> feature > >>>>>>>> and not supporting this feature. Here's the branch I implemented > >>> this > >>>>> in > >>>>>>>> in case you want to look around: > >>>>>>>> > >>>>>>>> https://github.com/apache/incubator-tinkerpop/tree/TINKERPOP-946 > >>>>>>>> > >>>>>>>> I'm not sure that my changes are completely bulletproof at this > >>> point, > >>>>>>> but > >>>>>>>> I'm reasonably sure that these changes would handle a good > majority > >>> of > >>>>>>>> calls for thread interruption. I expect to re-target my branch at > >>> tp31 > >>>>>>>> (currently from master so that i could use the benchmark suite) if > >>> this > >>>>>>>> becomes a pull request. > >>>>>>>> > >>>>>>>> Any thoughts on the benchmark, the implementation, etc? > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >>> > >> > >