TINKERPOP-2005 Handle evaluation excetions in AbstractEvalOpProcessor

Some script evaluation exceptions in AbstractEvalOpProcessor may occur
after the script has started executing. In this situation it it critical
to prevent potentially writing multiple final (e.g. error vs. success)
responses back to the client.

Exceptions that used to escape from evalOpInternal(...) would be
converted to error response messages by OpExecutorHandler, which could
coincide with a successful response from a quick script.

This change makes AbstractEvalOpProcessor do the error message
writing for the same type of exceptions that are handled by
OpExecutorHandler. However, AbstractEvalOpProcessor makes sure that
at most one final reponse message is sent by using
ResponseHandlerContext.

Also ResponseHandlerContext.writeAndFlush(...) methods will no longer
throw exceptions for attempts to send multiple final messages. This is
again to avoid multiple error response messages sent from
OpExecutorHandler.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b7a44953
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b7a44953
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b7a44953

Branch: refs/heads/tp33
Commit: b7a44953c8ac62f308b78709ab2565a78eddb1af
Parents: f592e34
Author: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com>
Authored: Mon Jul 30 12:38:21 2018 -0400
Committer: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com>
Committed: Mon Jul 30 12:38:21 2018 -0400

----------------------------------------------------------------------
 .../gremlin/server/ResponseHandlerContext.java  | 19 +++---
 .../server/op/AbstractEvalOpProcessor.java      | 26 +++++++-
 .../gremlin/server/op/AbstractOpProcessor.java  |  2 +-
 .../server/ResponseHandlerContextTest.java      | 45 +++++++++++---
 .../server/op/AbstractEvalOpProcessorTest.java  | 62 ++++++++++++++++++++
 5 files changed, 135 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
index fff4480..3c8c13c 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.server;
 
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -27,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * A context for asynchronously writing response messages related to a 
particular request.
  * <p>The "write" methods of this class ensure that at most one {@link 
ResponseStatusCode#isFinalResponse() final}
  * response message is written to the underlying channel. Attempts to write 
more than one final response message will
- * result in an {@link IllegalStateException}.</p>
+ * be ignored with a warning log message.</p>
  * <p>Note: an object of this class should be used instead of writing to the 
channel directly when multiple threads
  * are expected to produce final response messages concurrently. Callers must 
ensure that the same
  * {@link ResponseHandlerContext} is used by all threads writing response 
messages for the same request.</p>
@@ -35,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @author Dmitri Bourlatchkov
  */
 public class ResponseHandlerContext {
+    private static final Logger logger = 
LoggerFactory.getLogger(ResponseHandlerContext.class);
 
     private final Context context;
     private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
@@ -52,7 +55,7 @@ public class ResponseHandlerContext {
      * {@link ResponseStatusCode#isFinalResponse() final} response is written.
      * <p>Note: this method should be used instead of writing to the channel 
directly when multiple threads
      * are expected to produce response messages concurrently.</p>
-     * <p>Attempts to write more than one final response message will result 
in an {@link IllegalStateException}.</p>
+     * <p>Attempts to write more than one final response message will be 
ignored.</p>
      * @see #writeAndFlush(ResponseStatusCode, Object)
      */
     public void writeAndFlush(ResponseMessage message) {
@@ -65,16 +68,18 @@ public class ResponseHandlerContext {
      * <p>The caller must make sure that the provided response status code 
matches the content of the message.</p>
      * <p>Note: this method should be used instead of writing to the channel 
directly when multiple threads
      * are expected to produce response messages concurrently.</p>
-     * <p>Attempts to write more than one final response message will result 
in an {@link IllegalStateException}.</p>
+     * <p>Attempts to write more than one final response message will be 
ignored.</p>
      * @see #writeAndFlush(ResponseMessage)
      */
     public void writeAndFlush(ResponseStatusCode code, Object responseMessage) 
{
         final boolean messageIsFinal = code.isFinalResponse();
-        if(!finalResponseWritten.compareAndSet(false, messageIsFinal)) {
-            final String errorMessage = String.format("Another final response 
message was already written for request %s", 
context.getRequestMessage().getRequestId());
-            throw new IllegalStateException(errorMessage);
+        if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+            context.getChannelHandlerContext().writeAndFlush(responseMessage);
+        } else {
+            final String logMessage = String.format("Another final response 
message was already written for request %s, ignoring response code: %s",
+                    context.getRequestMessage().getRequestId(), code);
+            logger.warn(logMessage);
         }
 
-        context.getChannelHandlerContext().writeAndFlush(responseMessage);
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 6ff0452..39168c2 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -223,9 +223,33 @@ public abstract class AbstractEvalOpProcessor extends 
AbstractOpProcessor {
      *                                script evaluation.
      * @param bindingsSupplier A function that returns the {@link Bindings} to 
provide to the
      *                         {@link GremlinExecutor#eval} method.
+     * @see #evalOpInternal(ResponseHandlerContext, Supplier, BindingSupplier)
      */
     protected void evalOpInternal(final Context context, final 
Supplier<GremlinExecutor> gremlinExecutorSupplier,
                                   final BindingSupplier bindingsSupplier) 
throws OpProcessorException {
+        ResponseHandlerContext rhc = new ResponseHandlerContext(context);
+        try {
+            evalOpInternal(rhc, gremlinExecutorSupplier, bindingsSupplier);
+        } catch (Exception ex) {
+            // Exceptions may occur on after the script started executing, 
therefore corresponding errors must be
+            // reported via the ResponseHandlerContext.
+            logger.warn("Unable to process script evaluation request: " + ex, 
ex);
+            
rhc.writeAndFlush(ResponseMessage.build(context.getRequestMessage())
+                    .code(ResponseStatusCode.SERVER_ERROR)
+                    .statusAttributeException(ex)
+                    .statusMessage(ex.getMessage()).create());
+        }
+    }
+
+    /**
+     * A variant of {@link #evalOpInternal(Context, Supplier, 
BindingSupplier)} that is suitable for use in situations
+     * when multiple threads may produce {@link 
ResponseStatusCode#isFinalResponse() final} response messages
+     * concurrently.
+     * @see #evalOpInternal(Context, Supplier, BindingSupplier)
+     */
+    protected void evalOpInternal(final ResponseHandlerContext rhc, final 
Supplier<GremlinExecutor> gremlinExecutorSupplier,
+                                  final BindingSupplier bindingsSupplier) 
throws OpProcessorException {
+        final Context context = rhc.getContext();
         final Timer.Context timerContext = evalOpTimer.time();
         final ChannelHandlerContext ctx = context.getChannelHandlerContext();
         final RequestMessage msg = context.getRequestMessage();
@@ -246,8 +270,6 @@ public abstract class AbstractEvalOpProcessor extends 
AbstractOpProcessor {
         final long seto = args.containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ?
                 
Long.parseLong(args.get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT).toString()) : 
settings.scriptEvaluationTimeout;
 
-        ResponseHandlerContext rhc = new ResponseHandlerContext(context);
-
         final GremlinExecutor.LifeCycle lifeCycle = 
GremlinExecutor.LifeCycle.build()
                 .scriptEvaluationTimeoutOverride(seto)
                 .afterFailure((b,t) -> {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
index 1263c81..c2b6f1f 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
@@ -80,7 +80,7 @@ public abstract class AbstractOpProcessor implements 
OpProcessor {
     }
 
     /**
-     * A variant of {@link #handleIterator(Context, Iterator)} that is 
suitable for use in situations when mutiple
+     * A variant of {@link #handleIterator(Context, Iterator)} that is 
suitable for use in situations when multiple
      * threads may produce {@link ResponseStatusCode#isFinalResponse() final} 
response messages concurrently.
      * @see #handleIterator(Context, Iterator)
      */

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
index bea318b..6f15a33 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
@@ -19,10 +19,13 @@
 package org.apache.tinkerpop.gremlin.server;
 
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.log4j.Logger;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
-import org.hamcrest.CoreMatchers;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -32,8 +35,7 @@ import java.util.Arrays;
 import java.util.UUID;
 import java.util.function.BiFunction;
 
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 public class ResponseHandlerContextTest {
@@ -45,6 +47,7 @@ public class ResponseHandlerContextTest {
     private final RequestMessage request = 
RequestMessage.build("test").create();
     private final Context context = new Context(request, ctx, null, null, 
null, null);
     private final ResponseHandlerContext rhc = new 
ResponseHandlerContext(context);
+    private final Log4jRecordingAppender recordingAppender = new 
Log4jRecordingAppender();
 
     @Parameterized.Parameters(name = "{0}")
     public static Iterable<Object[]> data() {
@@ -79,6 +82,18 @@ public class ResponseHandlerContextTest {
         });
     }
 
+    @Before
+    public void addRecordingAppender() {
+        final Logger rootLogger = Logger.getRootLogger();
+        rootLogger.addAppender(recordingAppender);
+    }
+
+    @After
+    public void removeRecordingAppender() {
+        final Logger rootLogger = Logger.getRootLogger();
+        rootLogger.removeAppender(recordingAppender);
+    }
+
     @Test
     public void shouldAllowMultipleNonFinalResponses() {
         writeInvoker.apply(rhc, ResponseStatusCode.AUTHENTICATE);
@@ -99,12 +114,24 @@ public class ResponseHandlerContextTest {
         writeInvoker.apply(rhc, ResponseStatusCode.SUCCESS);
         Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
 
-        try {
-            writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
-            fail("Expected an IllegalStateException");
-        } catch (IllegalStateException ex) {
-            assertThat(ex.toString(), 
CoreMatchers.containsString(request.getRequestId().toString()));
-        }
+        writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+        assertTrue(recordingAppender.logContainsAny(".*" + 
request.getRequestId() + ".*"));
+        assertTrue(recordingAppender.logContainsAny(".*" + 
ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
+
+        // ensure there were no other writes to the channel
         Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
     }
+
+    @Test
+    public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
+        writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(rhc, ResponseStatusCode.PARTIAL_CONTENT);
+        assertTrue(recordingAppender.logContainsAny(".*" + 
request.getRequestId() + ".*"));
+        assertTrue(recordingAppender.logContainsAny(".*" + 
ResponseStatusCode.PARTIAL_CONTENT + "$"));
+
+        // ensure there were no other writes to the channel
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java
new file mode 100644
index 0000000..6f25e2e
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.op;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
+import org.apache.tinkerpop.gremlin.server.Context;
+import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import javax.script.SimpleBindings;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+
+public class AbstractEvalOpProcessorTest {
+
+    @Test
+    public void evalOpInternalShouldHandleAllEvaluationExceptions() throws 
OpProcessorException {
+        AbstractEvalOpProcessor processor = new StandardOpProcessor();
+        RequestMessage request = RequestMessage.build("test").create();
+        Settings settings = new Settings();
+        ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+        ArgumentCaptor<ResponseMessage> responseCaptor = 
ArgumentCaptor.forClass(ResponseMessage.class);
+
+        GremlinExecutor gremlinExecutor = Mockito.mock(GremlinExecutor.class);
+        Mockito.when(gremlinExecutor.eval(anyString(), anyString(), 
Mockito.any(), Mockito.<GremlinExecutor.LifeCycle>any()))
+                .thenThrow(new IllegalStateException("test-exception"));
+
+        Context context = new Context(request, ctx, settings, null, 
gremlinExecutor, null);
+        processor.evalOpInternal(context, context::getGremlinExecutor, 
SimpleBindings::new);
+
+        Mockito.verify(ctx, 
Mockito.times(1)).writeAndFlush(responseCaptor.capture());
+        assertEquals(ResponseStatusCode.SERVER_ERROR, 
responseCaptor.getValue().getStatus().getCode());
+        assertEquals(request.getRequestId(), 
responseCaptor.getValue().getRequestId());
+        assertThat(responseCaptor.getValue().getStatus().getMessage(), 
CoreMatchers.containsString("test-exception"));
+    }
+}
\ No newline at end of file

Reply via email to