I have been tinkering on this here and there and now have thread
interruption working for TinkerGraphComputer and SparkGraphComputer.

https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946

The tests for interruption unfortunately needed to be different - the same
testing mechanism didn't hold between OLAP and OLTP, but I got roughly the
same coverage.  I did have to OptOut the tests for spark though. I don't
have a reliable way to block for interruption at the right time so the
tests don't really test what they are supposed to (and typically just
fail). I could probably do it with a Thread.sleep() but i sense that will
just make the test randomly fail in different environments (like Travis or
whatever).  The tests do work perfectly for TinkerGraphComputer though.

Even though I don't have unit tests, I was able to manually test
SparkGraphComputer. I started a spark cluster, submitted my job in the
console in a thread and then killed the thread once i saw the job show up
in the spark web ui - the output is a little ugly but check this out:

gremlin> t = new Thread({
gremlin> try {
gremlin>   println g.V().out().out().toList()
gremlin> } catch (Exception ex) { ex.printStackTrace() }})
==>Thread[Thread-91,5,main]
gremlin> t.start()
==>null
gremlin> t.interrupt()
==>null
gremlin>
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException
at
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.VertexProgramStep.processNextStart(VertexProgramStep.java:80)
at
org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
at
org.apache.tinkerpop.gremlin.process.traversal.step.util.ExpandableStepIterator.next(ExpandableStepIterator.java:50)
at
org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep.processNextStart(ComputerResultStep.java:70)
at
org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:128)
at
org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.next(AbstractStep.java:38)
at
org.apache.tinkerpop.gremlin.process.traversal.Traversal.fill(Traversal.java:146)
at
org.apache.tinkerpop.gremlin.process.traversal.Traversal.toList(Traversal.java:103)
at
org.apache.tinkerpop.gremlin.process.traversal.Traversal$toList.call(Unknown
Source)
at
org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
at
org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
at
org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
at groovysh_evaluate$_run_closure1.doCall(groovysh_evaluate:5)
at groovysh_evaluate$_run_closure1.doCall(groovysh_evaluate)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)
at
org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:294)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1021)
at groovy.lang.Closure.call(Closure.java:426)
at groovy.lang.Closure.call(Closure.java:420)
at groovy.lang.Closure.run(Closure.java:507)
at java.lang.Thread.run(Thread.java:745)

We get the TraversalInterruptedException as expected and best of all I
watched the running job status in Spark web ui go to "KILLED" - so the
local interruption in the console cleaned up the job on the spark cluster!
That little bit of magic happens here:

https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946#diff-91e2c8dbe3d9312fea7aec8291c953a8R251

Marko, perhaps you can suggest some other places in that spark code that
might have long run iterations where we might want to check for thread
interruption. I only added that one (but maybe that's the only place we
want interrupt to occur - i don't know the code well enough to say).

Another thing I'd point out, is that the GraphComputer instances now use a
single thread executor to supply the Future<ComputerResult> from submit()
(instead of CompletableFuture.supplyAsync() which used ForkJoinPool):

https://github.com/apache/incubator-tinkerpop/compare/TINKERPOP-946#diff-7598ae12d354f03e698cb5319371fff1R75

That was important as use of ForkJoinPool doesn't seem to create futures
that respect cancellation.

What else needs to happen on this ticket? I guess GiraphGraphComputer needs
similar capability somehow? can giraph jobs be cancelled? I do think that
for OLAP, interruption can be a best effort kind of thing. It may not
always be easy to support this capability in all cases.



On Wed, Apr 20, 2016 at 8:03 AM, Marko Rodriguez <okramma...@gmail.com>
wrote:

> Dope.
>
> Wanna see if TinkerGraphComputer and SparkGraphComputer "just work" (i.e.
> "just fail" :) ?
>
> Marko.
>
> http://markorodriguez.com
>
> On Apr 20, 2016, at 5:59 AM, Stephen Mallette <spmalle...@gmail.com>
> wrote:
>
> > I refactored a bit and built parameterized tests, so we have better
> > coverage over the different variations in the steps and can easily add
> new
> > ones:
> >
> >
> https://github.com/apache/incubator-tinkerpop/blob/7a9f7de6ebed5d4293708524c772db0f0b2c3bac/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalInterruptionTest.java
> >
> > I've run the tests for TinkerGraph and Neo4j - both pass nicely.
> >
> > On Tue, Apr 19, 2016 at 7:13 PM, Stephen Mallette <spmalle...@gmail.com>
> > wrote:
> >
> >>> It can be done, but would require the provider implementations to
> >> change as they would need to message a "job kill."
> >>> Again, one thread is interrupted, but what about all the other threads…
> >> Its not that its "not possible," just that we will have design around
> this
> >> like for OLAP above.
> >>
> >> Agreed - should be possible - just something to think about as we move
> >> forward.
> >>
> >>> We should also add test cases to VertexStep, PropertyStep, GraphStep,
> >> etc. that use interrupt
> >>
> >> I added some more test cases to cover proper interrupt semantics for
> >> implementations. It took some thinking to come up with a way to test
> steps
> >> other than GraphStep though. To test, you have to start iteration of a
> >> Traversal in one thread and interrupt it in another. It's a bit
> imperfect
> >> because it's not immediately obvious how to ensure that my call to
> >> interrupt the thread will trigger a TraversalInterruptedException in a
> >> specific step. In other words, i can test:
> >>
> >> g.V()
> >>
> >> because that will use some variation of GraphStep and that's all there
> is,
> >> but when you have:
> >>
> >> g.V().out()
> >>
> >> I won't know for sure if if the test passes because the thread may have
> >> interrupted in GraphStep or VertexStep. Anyway, I came up with a way to
> do
> >> it with what I think is a clever use of sideEffect() to block at the
> right
> >> point and hold the traversal to force it to fail on the right step.  You
> >> can see that work here:
> >>
> >>
> >>
> https://github.com/apache/incubator-tinkerpop/commit/fd16fabd7595470bd33f30b882b1f0297d05b55a
> >>
> >> I covered VertexStep, PropertyStep, GraphStep as you suggested. I didn't
> >> do a lot of variations on them (e.g. didn't do g.E()). Any thoughts on
> how
> >> much coverage is "right" for this?
> >>
> >> as a side note, I was going to retarget the branch at tp31 but I'm
> >> starting to feel like this change is sufficiently big a feature that it
> >> should probably exist on the 3.2.x line where it can live with the
> >> benchmarks.
> >>
> >> On Mon, Apr 18, 2016 at 12:06 PM, Marko Rodriguez <okramma...@gmail.com
> >
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>>> Well - I don't think this code is highly specialized. It's good
> general
> >>>> practice to respect Thread.interrupted(). I think you'd find that
> >>> sentiment
> >>>> in just about any java concurrency programming book.  …
> >>>
> >>> Huh, I didn't know this was a "standard pattern." If so, cool. Then
> that
> >>> solves that.
> >>>
> >>>>> 1. In OLAP, where there can be multiple threads how does this work?
> >>>>> 2. In Giraph/Spark, how does this effect job execution and failure
> >>>> responses?
> >>>>
> >>>> I ended my initial post in this thread by deleting the last paragraph
> i
> >>>> wrote about OLAP.  :)  I guess there's still some question there as to
> >>> how
> >>>> that will work. If I interrupt the thread that was executing the OLAP
> >>>> traversal, it's only going to kill it waiting for the result from
> Spark
> >>> or
> >>>> wherever.  The traversal will still be executing in the context of
> >>> spark.
> >>>> I assume the way to deal with this is on an implementation specific
> >>> basis
> >>>> where I assume there is a way to cancel a running spark job (or
> running
> >>>> giraph job or whatever). If the Traversal that waits for interrupt
> could
> >>>> signal that cancellation somehow, i guess that would be the way to
> >>>> implement that. I don't know enough about the specifics of spark for
> how
> >>>> that would work but it sounds plausible, no?
> >>>
> >>> Yea, I don't know how this would work either as it would be
> master/slave
> >>> traversals needing to coordinate. It can be done, but would require the
> >>> provider implementations to change as they would need to message a "job
> >>> kill." We could add test cases to GraphComputerTest that ensure that
> all
> >>> OLAP engines handle such interrupts correctly. We should also add test
> >>> cases to VertexStep, PropertyStep, GraphStep, etc. that use interrupt
> as
> >>> these are the steps that most providers will implement/extend and we
> need
> >>> to ensure they are doing the interruptions correctly.
> >>>
> >>>>> 3. When we move into threaded OLTP, how will this be
> >>> triggered/effected?
> >>>>
> >>>> I'm not sure how that feature will be implemented - so i'm not sure
> how
> >>> to
> >>>> comment on that.
> >>>
> >>> It would be similar to OLAP, where you have a master traversal and
> >>> slave/parallel traversals. Again, one thread is interrupted, but what
> about
> >>> all the other threads… Its not that its "not possible," just that we
> will
> >>> have design around this like for OLAP above.
> >>>
> >>> Marko.
> >>>
> >>>
> >>>> On Mon, Apr 18, 2016 at 11:28 AM, Marko Rodriguez <
> okramma...@gmail.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I think a problem with this is that it requires every step
> >>> implementation
> >>>>> to have this construct in it -- though many steps simply extend the
> >>> base
> >>>>> FlatMapStep, MapStep, FilterStep, etc. However, not all and thus,
> this
> >>>>> requires all providers to know what this about and write their code
> >>>>> accordingly.
> >>>>>
> >>>>> A few questions:
> >>>>>
> >>>>>       1. In OLAP, where there can be multiple threads how does this
> >>> work?
> >>>>>       2. In Giraph/Spark, how does this effect job execution and
> >>> failure
> >>>>> responses?
> >>>>>       3. When we move into threaded OLTP, how will this be
> >>>>> triggered/effected?
> >>>>>       4. This doesn't work for "infinite loop" lambdas or "hung
> >>>>> databases."
> >>>>>
> >>>>> I know this is the oldest ticket in the books and a million solutions
> >>> have
> >>>>> been proposed, but it would be nice if this didn't require
> specialized
> >>> code
> >>>>> in all the steps. We are bound to "forget."
> >>>>>
> >>>>> Thanks,
> >>>>> Marko.
> >>>>>
> >>>>> http://markorodriguez.com
> >>>>>
> >>>>> On Apr 18, 2016, at 8:56 AM, Stephen Mallette <spmalle...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Do you mean:
> >>>>>>
> >>>>>> if (Thread.currentThread().isInterrupted()) throw new
> >>>>>> TraversalInterruptedException();
> >>>>>>
> >>>>>> If so, Thread.interrupted() basically does that under the covers
> >>>>>>
> >>>>>> On Mon, Apr 18, 2016 at 10:51 AM, Ted Wilmes <twil...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>>> Yeah, looks like benchmark-wise it's a wash, which is good.  I
> wasn't
> >>>>> aware
> >>>>>>> of the difference between the static interrupted() and non-static
> >>>>>>> isInterrupted().  I was wondering if in this case it should be
> >>>>>>> isInterrupted(), but I think how you did it is good because it'll
> be
> >>>>>>> evaluated within the traversal thread regardless.
> >>>>>>>
> >>>>>>> --Ted
> >>>>>>>
> >>>>>>> On Mon, Apr 18, 2016 at 6:11 AM, Stephen Mallette <
> >>> spmalle...@gmail.com
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> A while back, I brought up the issue of being able to interrupt
> >>>>>>> traversals:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://pony-poc.apache.org/thread.html/e6477fc9c58d37a5bdcb5938a0eaa285456ad15aa39e16446290e2ff@1444993523@%3Cdev.tinkerpop.apache.org%3E
> >>>>>>>> https://issues.apache.org/jira/browse/TINKERPOP-946
> >>>>>>>>
> >>>>>>>> As a quick refresher, making Traversal respect
> Thread.interrupted()
> >>> is
> >>>>>>>> important as you otherwise can quite easily lock up applications
> >>> like
> >>>>>>>> Gremlin Server with a few poorly conceived or errant queries. We'd
> >>> left
> >>>>>>>> that last thread with liking the idea, but there were concerns
> about
> >>>>> the
> >>>>>>>> complexity of the changes and performance hits.
> >>>>>>>>
> >>>>>>>> Given that we now have gremlin-benchmark, I decided to see what
> the
> >>>>>>>> performance hit would be for making this change. I took a rough
> >>> stab at
> >>>>>>> it
> >>>>>>>> introducing Thread.interrupted() in all steps where it seemed to
> >>> make
> >>>>>>> sense
> >>>>>>>> to do so and then ran the benchmark before and after the change.
> >>>>>>>>
> >>>>>>>>
> https://gist.github.com/spmallette/ed21267f2e7e17bb3fbd5a8d1a568d2b
> >>>>>>>>
> >>>>>>>> I'm not seeing a whole lot of difference between supporting this
> >>>>> feature
> >>>>>>>> and not supporting this feature.  Here's the branch I implemented
> >>> this
> >>>>> in
> >>>>>>>> in case you want to look around:
> >>>>>>>>
> >>>>>>>> https://github.com/apache/incubator-tinkerpop/tree/TINKERPOP-946
> >>>>>>>>
> >>>>>>>> I'm not sure that my changes are completely bulletproof at this
> >>> point,
> >>>>>>> but
> >>>>>>>> I'm reasonably sure that these changes would handle a good
> majority
> >>> of
> >>>>>>>> calls for thread interruption. I expect to re-target my branch at
> >>> tp31
> >>>>>>>> (currently from master so that i could use the benchmark suite) if
> >>> this
> >>>>>>>> becomes a pull request.
> >>>>>>>>
> >>>>>>>> Any thoughts on the benchmark, the implementation, etc?
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Reply via email to