Repository: tinkerpop Updated Branches: refs/heads/tp33 2cf551f08 -> fa7a7f613
TINKERPOP-2005 Reject multiple final responses in AbstractEvalOpProcessor Add isFinalResponse() getter to ResponseStatusCode Introduce ResponseHandlerContext to allow tracking the final response status per request message. Update AbstractOpProcessor, AbstractEvalOpProcessor and related classes to write response messages through ResponseHandlerContext methods as opposed to ChannelHandlerContext methods. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f592e344 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f592e344 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f592e344 Branch: refs/heads/tp33 Commit: f592e3446c84e9398e242a072cdfec64025e9566 Parents: f9ad72a Author: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com> Authored: Fri Jul 27 15:54:06 2018 -0400 Committer: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com> Committed: Fri Jul 27 15:54:06 2018 -0400 ---------------------------------------------------------------------- .../driver/message/ResponseStatusCode.java | 7 ++ .../driver/message/ResponseStatusCodeTest.java | 35 ++++++ .../gremlin/server/ResponseHandlerContext.java | 80 ++++++++++++++ .../server/op/AbstractEvalOpProcessor.java | 17 +-- .../gremlin/server/op/AbstractOpProcessor.java | 34 +++++- .../AbstractGremlinServerIntegrationTest.java | 9 +- .../server/GremlinServerIntegrateTest.java | 51 +++++++++ .../server/ResponseHandlerContextTest.java | 110 +++++++++++++++++++ .../server/op/AbstractOpProcessorTest.java | 73 ++++++++++++ 9 files changed, 401 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java index 3348107..f4e7f65 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCode.java @@ -123,4 +123,11 @@ public enum ResponseStatusCode { public boolean isSuccess() { return String.valueOf(this.value).startsWith("2"); } + + /** + * Indicates whether the status code can only be used in the last response for a particular request. + */ + public boolean isFinalResponse() { + return this != PARTIAL_CONTENT && this != AUTHENTICATE; + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCodeTest.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCodeTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCodeTest.java new file mode 100644 index 0000000..a231489 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/message/ResponseStatusCodeTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.driver.message; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ResponseStatusCodeTest { + + @Test + public void shouldIndicateFinalMessagesStatusCodes() { + assertFalse(ResponseStatusCode.AUTHENTICATE.isFinalResponse()); + assertFalse(ResponseStatusCode.PARTIAL_CONTENT.isFinalResponse()); + assertTrue(ResponseStatusCode.SUCCESS.isFinalResponse()); + assertTrue(ResponseStatusCode.SERVER_ERROR_TIMEOUT.isFinalResponse()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java new file mode 100644 index 0000000..fff4480 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A context for asynchronously writing response messages related to a particular request. + * <p>The "write" methods of this class ensure that at most one {@link ResponseStatusCode#isFinalResponse() final} + * response message is written to the underlying channel. Attempts to write more than one final response message will + * result in an {@link IllegalStateException}.</p> + * <p>Note: an object of this class should be used instead of writing to the channel directly when multiple threads + * are expected to produce final response messages concurrently. Callers must ensure that the same + * {@link ResponseHandlerContext} is used by all threads writing response messages for the same request.</p> + * + * @author Dmitri Bourlatchkov + */ +public class ResponseHandlerContext { + + private final Context context; + private final AtomicBoolean finalResponseWritten = new AtomicBoolean(); + + public ResponseHandlerContext(Context context) { + this.context = context; + } + + public Context getContext() { + return context; + } + + /** + * Writes a response message to the underlying channel while ensuring that at most one + * {@link ResponseStatusCode#isFinalResponse() final} response is written. + * <p>Note: this method should be used instead of writing to the channel directly when multiple threads + * are expected to produce response messages concurrently.</p> + * <p>Attempts to write more than one final response message will result in an {@link IllegalStateException}.</p> + * @see #writeAndFlush(ResponseStatusCode, Object) + */ + public void writeAndFlush(ResponseMessage message) { + writeAndFlush(message.getStatus().getCode(), message); + } + + /** + * Writes a response message to the underlying channel while ensuring that at most one + * {@link ResponseStatusCode#isFinalResponse() final} response is written. + * <p>The caller must make sure that the provided response status code matches the content of the message.</p> + * <p>Note: this method should be used instead of writing to the channel directly when multiple threads + * are expected to produce response messages concurrently.</p> + * <p>Attempts to write more than one final response message will result in an {@link IllegalStateException}.</p> + * @see #writeAndFlush(ResponseMessage) + */ + public void writeAndFlush(ResponseStatusCode code, Object responseMessage) { + final boolean messageIsFinal = code.isFinalResponse(); + if(!finalResponseWritten.compareAndSet(false, messageIsFinal)) { + final String errorMessage = String.format("Another final response message was already written for request %s", context.getRequestMessage().getRequestId()); + throw new IllegalStateException(errorMessage); + } + + context.getChannelHandlerContext().writeAndFlush(responseMessage); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java index 5c43b4d..6ff0452 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java @@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GremlinServer; +import org.apache.tinkerpop.gremlin.server.ResponseHandlerContext; import org.apache.tinkerpop.gremlin.server.Settings; import org.apache.tinkerpop.gremlin.server.util.MetricManager; import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer; @@ -245,6 +246,8 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { final long seto = args.containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ? Long.parseLong(args.get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT).toString()) : settings.scriptEvaluationTimeout; + ResponseHandlerContext rhc = new ResponseHandlerContext(context); + final GremlinExecutor.LifeCycle lifeCycle = GremlinExecutor.LifeCycle.build() .scriptEvaluationTimeoutOverride(seto) .afterFailure((b,t) -> { @@ -265,7 +268,7 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { logger.debug("Preparing to iterate results from - {} - in thread [{}]", msg, Thread.currentThread().getName()); try { - handleIterator(context, itty); + handleIterator(rhc, itty); } catch (Exception ex) { if (managedTransactionsForRequest) attemptRollback(msg, context.getGraphManager(), settings.strictTransactionManagement); @@ -282,25 +285,25 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { if (t != null) { if (t instanceof OpProcessorException) { - ctx.writeAndFlush(((OpProcessorException) t).getResponseMessage()); + rhc.writeAndFlush(((OpProcessorException) t).getResponseMessage()); } else if (t instanceof TimedInterruptTimeoutException) { // occurs when the TimedInterruptCustomizerProvider is in play final String errorMessage = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider", msg); logger.warn(errorMessage); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) + rhc.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider") .statusAttributeException(t).create()); } else if (t instanceof org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException) { // occurs when the TimedInterruptCustomizerProvider is in play final String errorMessage = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider", msg); logger.warn(errorMessage); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) + rhc.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) .statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider") .statusAttributeException(t).create()); } else if (t instanceof TimeoutException) { final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]", msg); logger.warn(errorMessage, t); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) + rhc.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) .statusMessage(t.getMessage()) .statusAttributeException(t).create()); } else { @@ -314,12 +317,12 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { ((MultipleCompilationErrorsException) t).getErrorCollector().getErrorCount() == 1) { final String errorMessage = String.format("The Gremlin statement that was submitted exceed the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(msg)); logger.warn(errorMessage); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION) + rhc.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION) .statusMessage(errorMessage) .statusAttributeException(t).create()); } else { logger.warn(String.format("Exception processing a script on request [%s].", msg), t); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION) + rhc.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION) .statusMessage(t.getMessage()) .statusAttributeException(t).create()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java index 8899bb5..1263c81 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java @@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GraphManager; import org.apache.tinkerpop.gremlin.server.OpProcessor; +import org.apache.tinkerpop.gremlin.server.ResponseHandlerContext; import org.apache.tinkerpop.gremlin.server.Settings; import org.apache.tinkerpop.gremlin.server.handler.Frame; import org.apache.tinkerpop.gremlin.server.handler.StateKey; @@ -72,8 +73,19 @@ public abstract class AbstractOpProcessor implements OpProcessor { * @param context The Gremlin Server {@link Context} object containing settings, request message, etc. * @param itty The result to iterator * @throws TimeoutException if the time taken to serialize the entire result set exceeds the allowable time. + * @see #handleIterator(ResponseHandlerContext, Iterator) */ protected void handleIterator(final Context context, final Iterator itty) throws TimeoutException, InterruptedException { + handleIterator(new ResponseHandlerContext(context), itty); + } + + /** + * A variant of {@link #handleIterator(Context, Iterator)} that is suitable for use in situations when mutiple + * threads may produce {@link ResponseStatusCode#isFinalResponse() final} response messages concurrently. + * @see #handleIterator(Context, Iterator) + */ + protected void handleIterator(final ResponseHandlerContext rhc, final Iterator itty) throws TimeoutException, InterruptedException { + final Context context = rhc.getContext(); final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final RequestMessage msg = context.getRequestMessage(); final Settings settings = context.getSettings(); @@ -90,7 +102,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { // as there is nothing left to iterate if we are transaction managed then we should execute a // commit here before we send back a NO_CONTENT which implies success if (managedTransactionsForRequest) attemptCommit(msg, context.getGraphManager(), settings.strictTransactionManagement); - ctx.writeAndFlush(ResponseMessage.build(msg) + rhc.writeAndFlush(ResponseMessage.build(msg) .code(ResponseStatusCode.NO_CONTENT) .create()); return; @@ -143,7 +155,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { // thread that processed the eval of the script so, we have to push serialization down into that Frame frame = null; try { - frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty)); + frame = makeFrame(rhc, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty)); } catch (Exception ex) { // a frame may use a Bytebuf which is a countable release - if it does not get written // downstream it needs to be released here @@ -191,7 +203,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { // required then it will be 100% complete before the client receives it. the "frame" at this point // should have completely detached objects from the transaction (i.e. serialization has occurred) // so a new one should not be opened on the flush down the netty pipeline - ctx.writeAndFlush(frame); + rhc.writeAndFlush(code, frame); } } else { // don't keep triggering this warning over and over again for the same request @@ -252,15 +264,29 @@ public abstract class AbstractOpProcessor implements OpProcessor { /** * @deprecated As of release 3.2.2, replaced by {@link #makeFrame(ChannelHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map)}. */ + @Deprecated protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg, final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate, final ResponseStatusCode code) throws Exception { return makeFrame(ctx, msg, serializer, useBinary, aggregate, code, Collections.emptyMap()); } + /** + * Caution: {@link #makeFrame(ResponseHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map)} + * should be used instead of this method whenever a {@link ResponseHandlerContext} is available. + */ protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg, final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate, final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception { + Context context = new Context(msg, ctx, null, null, null, null); // dummy context, good only for writing response messages to the channel + ResponseHandlerContext rhc = new ResponseHandlerContext(context); + return makeFrame(rhc, msg, serializer, useBinary, aggregate, code, responseMetaData); + } + + protected static Frame makeFrame(final ResponseHandlerContext rhc, final RequestMessage msg, + final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate, + final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception { + final ChannelHandlerContext ctx = rhc.getContext().getChannelHandlerContext(); try { if (useBinary) { return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg) @@ -283,7 +309,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { .statusMessage(errorMessage) .statusAttributeException(ex) .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create(); - ctx.writeAndFlush(error); + rhc.writeAndFlush(error); throw ex; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java index 7c124f0..a8a8853 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java @@ -39,6 +39,7 @@ import static org.junit.Assume.assumeThat; */ public abstract class AbstractGremlinServerIntegrationTest { protected GremlinServer server; + protected Settings overriddenSettings; private final static String epollOption = "gremlin.server.epoll"; private static final boolean GREMLIN_SERVER_EPOLL = "true".equalsIgnoreCase(System.getProperty(epollOption)); private static final Logger logger = LoggerFactory.getLogger(AbstractGremlinServerIntegrationTest.class); @@ -87,13 +88,13 @@ public abstract class AbstractGremlinServerIntegrationTest { public void startServer() throws Exception { final InputStream stream = getSettingsInputStream(); final Settings settings = Settings.read(stream); - final Settings overridenSettings = overrideSettings(settings); - ServerTestHelper.rewritePathsInGremlinServerSettings(overridenSettings); + overriddenSettings = overrideSettings(settings); + ServerTestHelper.rewritePathsInGremlinServerSettings(overriddenSettings); if (GREMLIN_SERVER_EPOLL) { - overridenSettings.useEpollEventLoop = true; + overriddenSettings.useEpollEventLoop = true; } - this.server = new GremlinServer(overridenSettings); + this.server = new GremlinServer(overriddenSettings); server.start().join(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 1db7a50..9256458 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -66,6 +66,7 @@ import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender; import org.apache.tinkerpop.gremlin.util.function.Lambda; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -75,6 +76,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -376,6 +378,40 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration } @Test + public void shouldProduceProperExceptionOnTimeout() throws Exception { + final Cluster cluster = TestClientFactory.open(); + final Client client = cluster.connect(name.getMethodName()); + + boolean success = false; + // Run a short test script a few times with progressively longer timeouts. + // Each submissions should either succeed or fail with a timeout. + // Note: the range of timeouts is intended to cover the case when the script finishes at about the + // same time when the timeout occurs. In this situation either a timeout response or a successful + // response is acceptable, however no other processing errors should occur. + // Note: the timeout of 30 ms is generally sufficient for running a simple groovy script, so using longer + // timeouts are not likely to results in a success/timeout response collision, which is the purpose + // of this test. + // Note: this test may have a false negative result, but a failure would indicate a real problem. + for(int i = 0; i < 30; i++) { + int timeout = 1 + i; + overriddenSettings.scriptEvaluationTimeout = timeout; + + try { + client.submit("x = 1 + 1").all().get().get(0).getInt(); + success = true; + } catch (Exception ex) { + final Throwable t = ex.getCause(); + assertThat("Unexpected exception with script evaluation timeout: " + timeout, t, instanceOf(ResponseException.class)); + assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode()); + } + } + + assertTrue("Some script submissions should succeed", success); + + cluster.close(); + } + + @Test public void shouldUseBaseScript() throws Exception { final Cluster cluster = TestClientFactory.open(); final Client client = cluster.connect(name.getMethodName()); @@ -985,6 +1021,21 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration } @Test + public void shouldHavePartialContentWithLongResultsCollection() throws Exception { + try (SimpleClient client = TestClientFactory.createWebSocketClient()) { + final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL) + .addArg(Tokens.ARGS_GREMLIN, "new String[100]").create(); + final List<ResponseMessage> responses = client.submit(request); + assertThat(responses.size(), Matchers.greaterThan(1)); + for (Iterator<ResponseMessage> it = responses.iterator(); it.hasNext(); ) { + ResponseMessage msg = it.next(); + ResponseStatusCode expected = it.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS; + assertEquals(expected, msg.getStatus().getCode()); + } + } + } + + @Test public void shouldFailWithBadScriptEval() throws Exception { try (SimpleClient client = TestClientFactory.createWebSocketClient()) { final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java new file mode 100644 index 0000000..bea318b --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.hamcrest.CoreMatchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +@RunWith(Parameterized.class) +public class ResponseHandlerContextTest { + + @Parameterized.Parameter(value = 0) + public BiFunction<ResponseHandlerContext, ResponseStatusCode, Void> writeInvoker; + + private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + private final RequestMessage request = RequestMessage.build("test").create(); + private final Context context = new Context(request, ctx, null, null, null, null); + private final ResponseHandlerContext rhc = new ResponseHandlerContext(context); + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + { + new BiFunction<ResponseHandlerContext, ResponseStatusCode, Void>() { + @Override + public Void apply(ResponseHandlerContext context, ResponseStatusCode code) { + context.writeAndFlush(code, "testMessage"); + return null; + } + + @Override + public String toString() { + return "writeAndFlush(ResponseStatusCode, Object)"; + } + } + }, { + new BiFunction<ResponseHandlerContext, ResponseStatusCode, Void>() { + @Override + public Void apply(ResponseHandlerContext context, ResponseStatusCode code) { + context.writeAndFlush(ResponseMessage.build(UUID.randomUUID()).code(code).create()); + return null; + } + + @Override + public String toString() { + return "writeAndFlush(ResponseMessage)"; + } + } + }, + }); + } + + @Test + public void shouldAllowMultipleNonFinalResponses() { + writeInvoker.apply(rhc, ResponseStatusCode.AUTHENTICATE); + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any()); + + writeInvoker.apply(rhc, ResponseStatusCode.PARTIAL_CONTENT); + Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any()); + + writeInvoker.apply(rhc, ResponseStatusCode.PARTIAL_CONTENT); + Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any()); + } + + @Test + public void shouldAllowAtMostOneFinalResponse() { + writeInvoker.apply(rhc, ResponseStatusCode.AUTHENTICATE); + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any()); + + writeInvoker.apply(rhc, ResponseStatusCode.SUCCESS); + Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any()); + + try { + writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT); + fail("Expected an IllegalStateException"); + } catch (IllegalStateException ex) { + assertThat(ex.toString(), CoreMatchers.containsString(request.getRequestId().toString())); + } + Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f592e344/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessorTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessorTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessorTest.java new file mode 100644 index 0000000..cf42737 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessorTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.op; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class AbstractOpProcessorTest { + + @Test + public void deprecatedMakeFrameMethodShouldRedirectCorrectly() throws Exception { + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + RequestMessage request = RequestMessage.build("test").create(); + ArgumentCaptor<ResponseMessage> responseCaptor = ArgumentCaptor.forClass(ResponseMessage.class); + + try { + // Induce a NullPointerException to validate error response message writing + //noinspection deprecation + AbstractOpProcessor.makeFrame(ctx, request, null, true, null, ResponseStatusCode.PARTIAL_CONTENT); + fail("Expected a NullPointerException"); + } catch (NullPointerException expected) { + // nop + } + + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(responseCaptor.capture()); + assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, responseCaptor.getValue().getStatus().getCode()); + assertEquals(request.getRequestId(), responseCaptor.getValue().getRequestId()); + } + + @Test + public void alternativeMakeFrameMethodShouldRedirectCorrectly() throws Exception { + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + RequestMessage request = RequestMessage.build("test").create(); + ArgumentCaptor<ResponseMessage> responseCaptor = ArgumentCaptor.forClass(ResponseMessage.class); + + try { + // Induce a NullPointerException to validate error response message writing + AbstractOpProcessor.makeFrame(ctx, request, null, true, null, ResponseStatusCode.PARTIAL_CONTENT, null); + fail("Expected a NullPointerException"); + } catch (NullPointerException expected) { + // nop + } + + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(responseCaptor.capture()); + assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, responseCaptor.getValue().getStatus().getCode()); + assertEquals(request.getRequestId(), responseCaptor.getValue().getRequestId()); + } + +} \ No newline at end of file