This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 84cfe8f00b5b4ecccc55d3cbdf06d95294e7c94f Merge: 014ba55 e0412a6 Author: Stephen Mallette <[email protected]> AuthorDate: Wed Dec 29 17:14:11 2021 -0500 Merge branch '3.4-dev' into 3.5-dev gremlin-server/conf/neo4j-empty.properties | 6 ++ .../tinkerpop/gremlin/server/GremlinServer.java | 35 ++++++++- .../driver/remote/AbstractRemoteGraphProvider.java | 6 ++ .../AbstractGremlinServerIntegrationTest.java | 25 ++++++- .../gremlin/server/GremlinDriverIntegrateTest.java | 85 +++++++++++----------- .../server/GremlinServerHttpIntegrateTest.java | 6 +- .../gremlin/server/GremlinServerIntegrateTest.java | 22 +++++- .../server/GremlinServerSessionIntegrateTest.java | 25 ++----- 8 files changed, 137 insertions(+), 73 deletions(-) diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java index e527b1f,c394531..2b66ce1 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java @@@ -18,9 -18,8 +18,10 @@@ */ package org.apache.tinkerpop.gremlin.server; +import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer; +import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest; import org.apache.tinkerpop.gremlin.server.op.OpLoader; + import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@@ -170,21 -162,24 +177,31 @@@ public abstract class AbstractGremlinSe return (directory.delete()); } - protected static void assumeNeo4jIsPresent() { - boolean neo4jIncludedForTesting; + protected static void tryIncludeNeo4jGraph(final Settings settings) { + if (isNeo4jPresent()) { + deleteDirectory(new File("/tmp/neo4j")); + settings.graphs.put("graph", "conf/neo4j-empty.properties"); + } + } + + protected static boolean isNeo4jPresent() { try { Class.forName("org.neo4j.tinkerpop.api.impl.Neo4jGraphAPIImpl"); - neo4jIncludedForTesting = true; + return true; - } catch (Exception ex) { + } catch (Throwable ex) { - neo4jIncludedForTesting = false; + return false; } + } + + protected static void assumeNeo4jIsPresent() { + boolean neo4jIncludedForTesting = isNeo4jPresent(); assumeThat("Neo4j implementation was not included for testing - run with -DincludeNeo4j", neo4jIncludedForTesting, is(true)); } + + private boolean shouldTestUnified() { + // ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test + // over the various channelizer implementations + return Boolean.parseBoolean(System.getProperty("testUnified", "false")) && + !this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage()); + } } diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 5ab086b,15b3004..7f78dd5 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@@ -97,6 -97,6 +97,7 @@@ import static org.hamcrest.number.Order import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; ++import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@@ -1472,13 -1521,13 +1472,13 @@@ public class GremlinDriverIntegrateTes assertThat(root, instanceOf(ResponseException.class)); final ResponseException re = (ResponseException) root; assertEquals(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS, re.getResponseStatusCode()); - } - - final Client rebound = cluster.connect().alias("graph"); - final Vertex v = rebound.submit("g.addVertex(T.label,'person')").all().get().get(0).getVertex(); - assertEquals("person", v.label()); - cluster.close(); + final Client rebound = cluster.connect().alias("graph"); - final Vertex v = rebound.submit("g.addVertex('name','jason')").all().get().get(0).getVertex(); - assertEquals("jason", v.value("name")); ++ final Vertex v = rebound.submit("g.addVertex(T.label,'person')").all().get().get(0).getVertex(); ++ assertEquals("person", v.label()); + } finally { + cluster.close(); + } } @Test @@@ -1493,14 -1542,14 +1493,14 @@@ final Throwable root = ExceptionUtils.getRootCause(ex); assertThat(root, instanceOf(ResponseException.class)); final ResponseException re = (ResponseException) root; - assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, re.getResponseStatusCode()); + assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode()); - } - - final Client rebound = cluster.connect().alias("graph"); - final Vertex v = rebound.submit("g.addVertex(label,'person','name','jason')").all().get().get(0).getVertex(); - assertEquals("person", v.label()); - cluster.close(); + final Client rebound = cluster.connect().alias("graph"); - final Vertex v = rebound.submit("g.addVertex('name','jason')").all().get().get(0).getVertex(); - assertEquals("jason", v.value("name")); ++ final Vertex v = rebound.submit("g.addVertex(label,'person','name','jason')").all().get().get(0).getVertex(); ++ assertEquals("person", v.label()); + } finally { + cluster.close(); + } } @Test @@@ -1782,28 -1816,39 +1784,28 @@@ public void shouldSendRequestIdBytecode() { final UUID overrideRequestId = UUID.randomUUID(); final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); - final Client client = Mockito.spy(cluster.connect().alias("g")); - Mockito.when(client.alias("g")).thenReturn(client); - GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client)); - g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate(); - cluster.close(); - ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class); - verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture()); - RequestOptions requestOptions = requestOptionsCaptor.getValue(); - assertTrue(requestOptions.getOverrideRequestId().isPresent()); - assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get()); + final Client client = Mockito.spy(cluster.connect().alias("g")); + Mockito.when(client.alias("g")).thenReturn(client); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client)); + g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate(); + cluster.close(); - ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class); - verify(client).submitAsync(requestMessageCaptor.capture()); - RequestMessage requestMessage = requestMessageCaptor.getValue(); - assertEquals(overrideRequestId, requestMessage.getRequestId()); + final ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class); + verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture()); + final RequestOptions requestOptions = requestOptionsCaptor.getValue(); + assertTrue(requestOptions.getOverrideRequestId().isPresent()); + assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get()); + final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class); + verify(client).submitAsync(requestMessageCaptor.capture()); + final RequestMessage requestMessage = requestMessageCaptor.getValue(); + assertEquals(overrideRequestId, requestMessage.getRequestId()); } - private void assertFutureTimeout(final CompletableFuture<List<Result>> futureFirst) { - try { - futureFirst.get(); - fail("Should have timed out"); - } catch (Exception ex) { - final Throwable root = ExceptionUtils.getRootCause(ex); - assertThat(root, instanceOf(ResponseException.class)); - assertThat(root.getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 250 ms")); - } - } - @Test public void shouldClusterReadFileFromResources() throws Exception { final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH); -- assertTrue(cluster != null); ++ assertNotNull(cluster); cluster.close(); } @@@ -1847,10 -1894,10 +1850,10 @@@ logger.info("Verifying driver cannot connect to server."); client.submit("g").all().get(500, TimeUnit.MILLISECONDS); fail("Should throw an exception."); - } catch (RuntimeException re) { + } catch (Exception re) { // Client would have no active connections to the host, hence it would encounter a timeout // trying to find an alive connection to the host. - assertThat(re.getCause(), instanceOf(NoHostAvailableException.class)); + assertThat(re, instanceOf(NoHostAvailableException.class)); // // should recover when the server comes back diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 71c5401,4a95ae3..fd12632 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@@ -381,7 -335,37 +386,7 @@@ public class GremlinServerIntegrateTes } @Test - public void shouldTimeOutRemoteTraversalWithPerRequestOption() { - public void shouldTimeOutRemoteTraversalUsingDeprecatedConfiguration() throws Exception { - final GraphTraversalSource g = traversal().withRemote(conf); - - try { - // tests sleeping thread - g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate(); - fail("This traversal should have timed out"); - } catch (Exception ex) { - final Throwable t = ex.getCause(); - assertThat(t, instanceOf(ResponseException.class)); - assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode()); - } - - // make a graph with a cycle in it to force a long run traversal - graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate(); - - try { - // tests an "unending" traversal - g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate(); - fail("This traversal should have timed out"); - } catch (Exception ex) { - final Throwable t = ex.getCause(); - assertThat(t, instanceOf(ResponseException.class)); - assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode()); - } - - g.close(); - } - - @Test + public void shouldTimeOutRemoteTraversalWithPerRequestOption() throws Exception { final GraphTraversalSource g = traversal().withRemote(conf); try { @@@ -1055,22 -1220,8 +1065,26 @@@ fail("Should have tanked out because of number of parameters used and size of the compile script"); } catch (Exception ex) { assertThat(ex.getMessage(), containsString("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM")); + } finally { + cluster.close(); } } + + @Test + public void shouldGenerateTemporaryErrorResponseStatusCode() throws Exception { + final Cluster cluster = TestClientFactory.build().create(); + final Client client = cluster.connect(); + + try { + client.submit("g.addV('person').sideEffect{throw new org.apache.tinkerpop.gremlin.server.util.DefaultTemporaryException('try again!')}").all().get(); + fail("Should have tanked since we threw an exception out manually"); + } catch (Exception ex) { + final Throwable t = ex.getCause(); + assertThat(t, instanceOf(ResponseException.class)); + assertEquals("try again!", t.getMessage()); + assertEquals(ResponseStatusCode.SERVER_ERROR_TEMPORARY, ((ResponseException) t).getResponseStatusCode()); ++ } finally { ++ cluster.close(); + } + } } diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java index 588d36b,5af4f60..a7e7ac2 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java @@@ -107,13 -100,13 +106,19 @@@ public class GremlinServerSessionIntegr processorSettings.config = new HashMap<>(); processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L); settings.processors.add(processorSettings); + + // Unified setting + settings.sessionLifetimeTimeout = 3000L; break; - case "shouldBlockAdditionalRequestsDuringClose": - case "shouldBlockAdditionalRequestsDuringForceClose": + case "shouldCloseSessionOnClientClose": + case "shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions": - clearNeo4j(settings); + case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient": + case "shouldExecuteInSessionWithTransactionManagement": + case "shouldRollbackOnEvalExceptionForManagedTransaction": ++ case "shouldNotExecuteQueuedRequestsIfOneInFrontOfItFails": + tryIncludeNeo4jGraph(settings); ++ case "shouldBlowTheSessionQueueSize": ++ settings.maxSessionTaskQueueSize = 1; break; case "shouldEnsureSessionBindingsAreThreadSafe": settings.threadPoolWorker = 2; @@@ -134,72 -118,23 +139,57 @@@ processorSettingsForDisableFunctionCache.config = new HashMap<>(); processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false); settings.processors.add(processorSettingsForDisableFunctionCache); + + // UnifiedHandler settings + settings.useCommonEngineForSessions = false; + settings.useGlobalFunctionCacheForSessions = false; + break; - case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient": - case "shouldExecuteInSessionWithTransactionManagement": - case "shouldRollbackOnEvalExceptionForManagedTransaction": - case "shouldNotExecuteQueuedRequestsIfOneInFrontOfItFails": - clearNeo4j(settings); - break; - case "shouldBlowTheSessionQueueSize": - clearNeo4j(settings); - settings.maxSessionTaskQueueSize = 1; - break; } return settings; } - private static void clearNeo4j(Settings settings) { - deleteDirectory(new File("/tmp/neo4j")); - settings.graphs.put("graph", "conf/neo4j-empty.properties"); - } - @Test - public void shouldUseGlobalFunctionCache() throws Exception { - final Cluster cluster = TestClientFactory.open(); - final Client client = cluster.connect(name.getMethodName()); + public void shouldBlowTheSessionQueueSize() throws Exception { + assumeNeo4jIsPresent(); + assumeThat(isUsingUnifiedChannelizer(), is(true)); - try { - assertEquals(3, client.submit("def addItUp(x,y){x+y};addItUp(1,2)").all().get().get(0).getInt()); - assertEquals(3, client.submit("addItUp(1,2)").all().get().get(0).getInt()); - } finally { - cluster.close(); + final Cluster cluster = TestClientFactory.open(); + final Client session = cluster.connect(name.getMethodName()); + + // maxSessionQueueSize=1 + // we should be able to do one request at a time serially + assertEquals("test1", session.submit("'test1'").all().get().get(0).getString()); + assertEquals("test2", session.submit("'test2'").all().get().get(0).getString()); + assertEquals("test3", session.submit("'test3'").all().get().get(0).getString()); + + final AtomicBoolean errorTriggered = new AtomicBoolean(); + final ResultSet r1 = session.submitAsync("Thread.sleep(1000);'test4'").get(); + + final List<CompletableFuture<List<Result>>> blockers = new ArrayList<>(); + for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) { + blockers.add(session.submit("'test'").all().exceptionally(t -> { + final ResponseException re = (ResponseException) t.getCause(); + errorTriggered.compareAndSet(false, ResponseStatusCode.TOO_MANY_REQUESTS == re.getResponseStatusCode()); + return null; + })); + + // low resource environments like travis might need a break + if (ix % 32 == 0) Thread.sleep(500); } + + // wait for the blockage to clear for sure + assertEquals("test4", r1.all().get().get(0).getString()); + blockers.forEach(CompletableFuture::join); + + assertThat(errorTriggered.get(), is(true)); + + // should be accepting test6 now + assertEquals("test6", session.submit("'test6'").all().get().get(0).getString()); + + session.close(); + cluster.close(); } @Test @@@ -568,7 -370,7 +558,6 @@@ } finally { cluster.close(); } -- } @Test
