Repository: tinkerpop Updated Branches: refs/heads/master efcff1613 -> 150a53d8a
TINKERPOP-1511 Fixed problem in TraversalOpProcessor TraversalOpProcessor was sending back the final message before the tx commit was happening. CTR Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/150a53d8 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/150a53d8 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/150a53d8 Branch: refs/heads/master Commit: 150a53d8a0ba2c2276f2dec49428f1ee14e67d70 Parents: efcff16 Author: Stephen Mallette <sp...@genoprime.com> Authored: Mon Oct 17 13:53:32 2016 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Mon Oct 17 13:57:14 2016 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../op/traversal/TraversalOpProcessor.java | 146 ++++++++++++++++++- 2 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 67506e0..5b4c578 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -27,6 +27,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * Restructured Gremlin-Python's GraphSON I/O package to make it easier for users to register serializers/deserializers. (*breaking*) +* Fixed a bug with `TraversalOpProcessor` that was returning a final result prior to committing the transaction. * Fixed a bug in `ConnectiveStrategy` where infix and/or was not correctly reasoning on `choose()` `HasNextStep` injections. * Increased performance of `CredentialGraph` authentication. * Removed Java 8 stream usage from `TraversalHelper` for performance reasons. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java index 88d8d90..4b559a3 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java @@ -22,6 +22,8 @@ import com.codahale.metrics.Timer; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import io.netty.channel.ChannelHandlerContext; +import org.apache.commons.lang.time.StopWatch; +import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.Tokens; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; @@ -38,6 +40,8 @@ import org.apache.tinkerpop.gremlin.server.GraphManager; import org.apache.tinkerpop.gremlin.server.GremlinServer; import org.apache.tinkerpop.gremlin.server.OpProcessor; import org.apache.tinkerpop.gremlin.server.Settings; +import org.apache.tinkerpop.gremlin.server.handler.Frame; +import org.apache.tinkerpop.gremlin.server.handler.StateKey; import org.apache.tinkerpop.gremlin.server.op.AbstractOpProcessor; import org.apache.tinkerpop.gremlin.server.op.OpProcessorException; import org.apache.tinkerpop.gremlin.server.util.MetricManager; @@ -52,9 +56,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.script.SimpleBindings; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -369,7 +375,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor { try { // compile the traversal - without it getEndStep() has nothing in it traversal.applyStrategies(); - handleIterator(context, new TraverserIterator(traversal)); + handleIterator(context, new TraverserIterator(traversal), graph); } catch (TimeoutException ex) { final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage()); logger.warn(errorMessage); @@ -382,8 +388,6 @@ public class TraversalOpProcessor extends AbstractOpProcessor { onError(graph, context); return; } - - onTraversalSuccess(graph, context); } catch (Exception ex) { logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex); ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create()); @@ -444,4 +448,140 @@ public class TraversalOpProcessor extends AbstractOpProcessor { return metaData; } + + protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws TimeoutException, InterruptedException { + final ChannelHandlerContext ctx = context.getChannelHandlerContext(); + final RequestMessage msg = context.getRequestMessage(); + final Settings settings = context.getSettings(); + final MessageSerializer serializer = ctx.channel().attr(StateKey.SERIALIZER).get(); + final boolean useBinary = ctx.channel().attr(StateKey.USE_BINARY).get(); + boolean warnOnce = false; + + + // we have an empty iterator - happens on stuff like: g.V().iterate() + if (!itty.hasNext()) { + // as there is nothing left to iterate if we are transaction managed then we should execute a + // commit here before we send back a NO_CONTENT which implies success + onTraversalSuccess(graph, context); + ctx.writeAndFlush(ResponseMessage.build(msg) + .code(ResponseStatusCode.NO_CONTENT) + .create()); + return; + } + + // timer for the total serialization time + final StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + // the batch size can be overridden by the request + final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE) + .orElse(settings.resultIterationBatchSize); + List<Object> aggregate = new ArrayList<>(resultIterationBatchSize); + + // use an external control to manage the loop as opposed to just checking hasNext() in the while. this + // prevent situations where auto transactions create a new transaction after calls to commit() withing + // the loop on calls to hasNext(). + boolean hasMore = itty.hasNext(); + + while (hasMore) { + if (Thread.interrupted()) throw new InterruptedException(); + + // check if an implementation needs to force flush the aggregated results before the iteration batch + // size is reached. + final boolean forceFlush = isForceFlushed(ctx, msg, itty); + + // have to check the aggregate size because it is possible that the channel is not writeable (below) + // so iterating next() if the message is not written and flushed would bump the aggregate size beyond + // the expected resultIterationBatchSize. Total serialization time for the response remains in + // effect so if the client is "slow" it may simply timeout. + // + // there is a need to check hasNext() on the iterator because if the channel is not writeable the + // previous pass through the while loop will have next()'d the iterator and if it is "done" then a + // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't + // require a forced flush which can be forced by sub-classes. + // + // this could be placed inside the isWriteable() portion of the if-then below but it seems better to + // allow iteration to continue into a batch if that is possible rather than just doing nothing at all + // while waiting for the client to catch up + if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next()); + + // send back a page of results if batch size is met or if it's the end of the results being iterated. + // also check writeability of the channel to prevent OOME for slow clients. + if (ctx.channel().isWritable()) { + if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) { + final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS; + + // serialize here because in sessionless requests the serialization must occur in the same + // thread as the eval. as eval occurs in the GremlinExecutor there's no way to get back to the + // thread that processed the eval of the script so, we have to push serialization down into that + Frame frame = null; + try { + frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty)); + } catch (Exception ex) { + // a frame may use a Bytebuf which is a countable release - if it does not get written + // downstream it needs to be released here + if (frame != null) frame.tryRelease(); + + // exception is handled in makeFrame() - serialization error gets written back to driver + // at that point + onError(graph, context); + break; + } + + try { + // only need to reset the aggregation list if there's more stuff to write + if (itty.hasNext()) + aggregate = new ArrayList<>(resultIterationBatchSize); + else { + // iteration and serialization are both complete which means this finished successfully. note that + // errors internal to script eval or timeout will rollback given GremlinServer's global configurations. + // local errors will get rolledback below because the exceptions aren't thrown in those cases to be + // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if + // there are no more items to iterate and serialization is complete + onTraversalSuccess(graph, context); + + // exit the result iteration loop as there are no more results left. using this external control + // because of the above commit. some graphs may open a new transaction on the call to + // hasNext() + hasMore = false; + } + } catch (Exception ex) { + // a frame may use a Bytebuf which is a countable release - if it does not get written + // downstream it needs to be released here + if (frame != null) frame.tryRelease(); + throw ex; + } + + iterateComplete(ctx, msg, itty); + + // the flush is called after the commit has potentially occurred. in this way, if a commit was + // required then it will be 100% complete before the client receives it. the "frame" at this point + // should have completely detached objects from the transaction (i.e. serialization has occurred) + // so a new one should not be opened on the flush down the netty pipeline + ctx.writeAndFlush(frame); + } + } else { + // don't keep triggering this warning over and over again for the same request + if (!warnOnce) { + logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg); + warnOnce = true; + } + + // since the client is lagging we can hold here for a period of time for the client to catch up. + // this isn't blocking the IO thread - just a worker. + TimeUnit.MILLISECONDS.sleep(10); + } + + stopWatch.split(); + if (settings.serializedResponseTimeout > 0 && stopWatch.getSplitTime() > settings.serializedResponseTimeout) { + final String timeoutMsg = String.format("Serialization of the entire response exceeded the 'serializeResponseTimeout' setting %s", + warnOnce ? "[Gremlin Server paused writes to client as messages were not being consumed quickly enough]" : ""); + throw new TimeoutException(timeoutMsg.trim()); + } + + stopWatch.unsplit(); + } + + stopWatch.stop(); + } }