This is an automated email from the ASF dual-hosted git repository. Cole-Greer pushed a commit to branch gremlinSocketServer in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 0fe90f03d3166acba076f36ffa3d23b26bf6bb24 Author: Cole Greer <[email protected]> AuthorDate: Thu May 28 13:16:52 2026 -0700 Add HTTP behavioral tests for Java driver Replace the old WebSocket behavioral test infrastructure with an HTTP equivalent. The lightweight test server in gremlin-socket-server dispatches on the Gremlin query string to simulate adversarial network conditions. Server changes: - Remove dead WebSocket artifacts (TestHandlers, test-ws-gremlin.yaml) - Replace SocketServerSettings with SocketServerConstants (pure constants) - Rename SimpleSocketServer to SimpleTestServer - Add TestHttpServerInitializer and TestHttpGremlinHandler - Scenarios: single vertex, close connection, vertex-then-close, error after delay, partial content close, malformed response, no response, slow drip-feed, empty body Test changes: - Add ClientBehaviorIntegrateTest in gremlin-driver with 13 tests - Add test-scoped dependency on gremlin-socket-server in gremlin-driver --- gremlin-driver/pom.xml | 6 + .../driver/ClientBehaviorIntegrateTest.java | 369 +++++++++++++++++++++ gremlin-tools/gremlin-socket-server/Dockerfile | 1 - .../conf/test-ws-gremlin.yaml | 56 ---- ...mpleSocketServer.java => SimpleTestServer.java} | 10 +- .../socket/server/SocketServerConstants.java | 40 +++ .../gremlin/socket/server/SocketServerRunner.java | 20 +- .../socket/server/SocketServerSettings.java | 93 ------ .../gremlin/socket/server/TestHandlers.java | 48 --- .../socket/server/TestHttpGremlinHandler.java | 210 ++++++++++++ ...rRunner.java => TestHttpServerInitializer.java} | 26 +- 11 files changed, 648 insertions(+), 231 deletions(-) diff --git a/gremlin-driver/pom.xml b/gremlin-driver/pom.xml index 53e04d071c..14457c2d17 100644 --- a/gremlin-driver/pom.xml +++ b/gremlin-driver/pom.xml @@ -88,6 +88,12 @@ limitations under the License. <optional>true</optional> </dependency> <!-- TEST --> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>gremlin-socket-server</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.tinkerpop</groupId> <artifactId>gremlin-test</artifactId> diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientBehaviorIntegrateTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientBehaviorIntegrateTest.java new file mode 100644 index 0000000000..4f1d128911 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientBehaviorIntegrateTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import nl.altindag.log.LogCaptor; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.socket.server.SimpleTestServer; +import org.apache.tinkerpop.gremlin.socket.server.SocketServerConstants; +import org.apache.tinkerpop.gremlin.socket.server.TestHttpServerInitializer; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ClientBehaviorIntegrateTest { + + private static final int PORT = 45943; + + private static SimpleTestServer server; + private static Cluster cluster; + private static Client client; + private static LogCaptor logCaptor; + + @BeforeClass + public static void setUp() throws InterruptedException { + server = new SimpleTestServer(PORT); + server.start(new TestHttpServerInitializer()); + + cluster = buildCluster().create(); + client = cluster.connect(); + + logCaptor = LogCaptor.forClass(ConnectionPool.class); + final Logger poolLogger = (Logger) LoggerFactory.getLogger(ConnectionPool.class); + poolLogger.setLevel(Level.DEBUG); + } + + @AfterClass + public static void tearDown() { + if (client != null) client.close(); + if (cluster != null) cluster.close(); + if (server != null) server.stop(); + logCaptor.close(); + } + + private static Cluster.Builder buildCluster() { + return Cluster.build("localhost") + .validationRequest(SocketServerConstants.GREMLIN_SINGLE_VERTEX) + .port(PORT) + .serializer(new GraphBinaryMessageSerializerV4()); + } + + private static void clearLogs() { + logCaptor.clearLogs(); + } + + @Test + public void shouldReceiveSingleVertex() throws Exception { + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + @Test + public void shouldHandleServerClosingConnectionBeforeResponse() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + // ExecutionException -> IllegalStateException or IOException + final Throwable cause = e.getCause(); + assertTrue("Expected IllegalStateException or IOException but got " + cause.getClass().getName(), + cause instanceof IllegalStateException || cause instanceof IOException); + } + + // driver should recover + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + @Test + public void shouldHandleSuccessiveConnectionClosures() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException ignored) { + } + + try { + client.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException ignored) { + } + + // driver should recover after multiple failures + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + @Test + public void shouldHandleAsyncRequestsDuringConnectionClose() throws Exception { + clearLogs(); + + final CompletableFuture<List<Result>> f1 = client.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all(); + final CompletableFuture<List<Result>> f2 = client.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all(); + + try { + f1.get(5, TimeUnit.SECONDS); + } catch (ExecutionException ignored) { + } + + try { + f2.get(5, TimeUnit.SECONDS); + } catch (ExecutionException ignored) { + } + + // wait for pool stabilization + Thread.sleep(2000); + + // verify the pool considered/created a replacement connection + assertTrue("Expected pool to replace or consider new connection", + logCaptor.getLogs().stream().anyMatch(msg -> + msg.contains("Replace ") || msg.contains("Considering new connection"))); + + // verify recovery + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + @Test + public void shouldHandleServerClosingConnectionAfterResponse() throws Exception { + clearLogs(); + + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_VERTEX_THEN_CLOSE).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + + // wait for the delayed close to happen + Thread.sleep(3000); + + // connection pool should create a new connection + final List<Result> results2 = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results2.size()); + assertThat(results2.get(0).getObject(), instanceOf(Vertex.class)); + + // verify the pool dealt with the dead connection + assertTrue("Expected pool to replace or destroy the dead connection", + logCaptor.getLogs().stream().anyMatch(msg -> msg.contains("Replace ") || msg.contains("Destroyed "))); + } + + @Test + public void shouldHandleServerErrorAfterDelay() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_FAIL_AFTER_DELAY).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(ResponseException.class)); + final ResponseException re = (ResponseException) e.getCause(); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, re.getResponseStatusCode()); + assertTrue(re.getMessage().contains("Server error")); + } + + // A 500 is a valid HTTP response, connection should still be usable + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + } + + @Test + public void shouldHandlePartialContentClose() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_PARTIAL_CONTENT_CLOSE).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertTrue(e.getCause().getMessage().contains("no longer active")); + } + + // driver should recover with a new connection + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + } + + @Test + public void shouldHandleMalformedResponse() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_MALFORMED_RESPONSE).all().get(); + fail("Expected ExecutionException"); + } catch (ExecutionException ignored) { + } + + // connection should still be usable or pool creates new one + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + @Test + public void shouldHandleEmptyResponseBody() throws Exception { + try { + client.submit(SocketServerConstants.GREMLIN_EMPTY_BODY).all().get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + // Empty body causes the stream reader to hit EOF immediately + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertThat(e.getCause().getCause(), instanceOf(java.io.EOFException.class)); + assertTrue(e.getCause().getMessage().contains("EOFException")); + } catch (TimeoutException e) { + fail("Driver hung on empty response body instead of throwing an error"); + } + + // driver should recover + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + } + + @Test + public void shouldHandleSlowResponse() throws Exception { + final List<Result> results = client.submit(SocketServerConstants.GREMLIN_SLOW_RESPONSE) + .all().get(10, TimeUnit.SECONDS); + assertEquals(3, results.size()); + } + + @Test + public void shouldTimeoutWhenServerNeverResponds() throws Exception { + final Cluster timeoutCluster = buildCluster() + .idleConnectionTimeoutMillis(2000) + .create(); + final Client timeoutClient = timeoutCluster.connect(); + try { + try { + timeoutClient.submit(SocketServerConstants.GREMLIN_NO_RESPONSE).all().get(5, TimeUnit.SECONDS); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertTrue(e.getCause().getMessage().contains("Idle timeout occurred before response could be received")); + } + + // same client should recover - pool replaces the dead connection + final List<Result> results = timeoutClient.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(); + assertEquals(1, results.size()); + } finally { + timeoutClient.close(); + timeoutCluster.close(); + } + } + + @Test + public void shouldRecoverFromTemporaryServerDowntime() throws Exception { + // Use a dedicated cluster with short reconnect interval for this test + final Cluster recoveryCluster = buildCluster().reconnectInterval(500).create(); + final Client recoveryClient = recoveryCluster.connect(); + try { + // First verify the client works + final List<Result> before = recoveryClient.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX) + .all().get(5, TimeUnit.SECONDS); + assertEquals(1, before.size()); + + // Stop the server — existing connections become dead + server.stopSync(); + try { + // Attempt request with the SAME client — expect failure + try { + recoveryClient.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(5, TimeUnit.SECONDS); + fail("Should have thrown exception while server is down"); + } catch (RuntimeException e) { + assertThat(e.getCause(), instanceOf(TimeoutException.class)); + assertTrue(e.getCause().getMessage().contains("Connection refused")); + } + } finally { + // Restart the server + server.start(new TestHttpServerInitializer()); + } + + // Wait for the driver's reconnection logic to detect the server is back + Thread.sleep(3000); + + // Verify the SAME client self-heals and can submit successfully again + final List<Result> after = recoveryClient.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX) + .all().get(30, TimeUnit.SECONDS); + assertEquals(1, after.size()); + } finally { + recoveryClient.close(); + recoveryCluster.close(); + } + } + @Test + public void shouldHandleConcurrentMixedRequests() throws Exception { + final int count = 5; + final ExecutorService executor = Executors.newFixedThreadPool(count * 2); + + // Use a fresh cluster with enough pool size + final Cluster concurrentCluster = buildCluster().maxConnectionPoolSize(count * 2).create(); + final Client concurrentClient = concurrentCluster.connect(); + + try { + // Submit good and bad requests concurrently, tracking them separately + final List<Future<List<Result>>> goodFutures = new ArrayList<>(); + final List<Future<List<Result>>> badFutures = new ArrayList<>(); + + for (int i = 0; i < count; i++) { + goodFutures.add(executor.submit(() -> + concurrentClient.submit(SocketServerConstants.GREMLIN_SINGLE_VERTEX).all().get(10, TimeUnit.SECONDS))); + badFutures.add(executor.submit(() -> + concurrentClient.submit(SocketServerConstants.GREMLIN_CLOSE_CONNECTION).all().get(10, TimeUnit.SECONDS))); + } + + // All good requests should succeed + for (final Future<List<Result>> f : goodFutures) { + final List<Result> results = f.get(15, TimeUnit.SECONDS); + assertEquals(1, results.size()); + assertThat(results.get(0).getObject(), instanceOf(Vertex.class)); + } + + // All bad requests should fail + for (final Future<List<Result>> f : badFutures) { + try { + f.get(15, TimeUnit.SECONDS); + fail("Expected exception for close-connection request"); + } catch (ExecutionException expected) { + // Expected — server closed the connection + } + } + } finally { + concurrentClient.close(); + concurrentCluster.close(); + executor.shutdown(); + } + } +} diff --git a/gremlin-tools/gremlin-socket-server/Dockerfile b/gremlin-tools/gremlin-socket-server/Dockerfile index 9eef686765..3010f33cd6 100644 --- a/gremlin-tools/gremlin-socket-server/Dockerfile +++ b/gremlin-tools/gremlin-socket-server/Dockerfile @@ -28,7 +28,6 @@ RUN apk add --no-cache --update \ COPY ${SOCKET_SERVER_DIR}/gremlin-socket-server-${SOCKET_SERVER_VERSION}.jar /opt/gremlin-socket-server/gremlin-socket-server.jar COPY ${SOCKET_SERVER_DIR}/libs/ /opt/gremlin-socket-server/libs/ -COPY ${SOCKET_SERVER_DIR}/../conf/ opt/gremlin-socket-server/conf/ WORKDIR /opt/gremlin-socket-server diff --git a/gremlin-tools/gremlin-socket-server/conf/test-ws-gremlin.yaml b/gremlin-tools/gremlin-socket-server/conf/test-ws-gremlin.yaml deleted file mode 100644 index a358d4ce12..0000000000 --- a/gremlin-tools/gremlin-socket-server/conf/test-ws-gremlin.yaml +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# This file is to be used to configure an instance of Gremlin-Socket-Server as -# well as any driver trying to run tests against it. -# Gremlin-Socket-Server listens for requests with the specific request ids below -# and triggers response behavior accordingly. Driver's should use the request id -# corresponding with the desired behavior whenever sending a request to -# gremlin-socket-server. - -# Port exposed by gremlin-socket-server -PORT: 45943 - -# Configures which serializer will be used. Ex: GraphBinaryV1 or GraphSONV2 -SERIALIZER: GraphBinaryV1 - -# If a request with this ID comes to the server, the server responds back with a single -# vertex picked from Modern graph. -SINGLE_VERTEX_REQUEST_ID: 6457272a-4018-4538-b9ae-08dd5ddc0aa1 - -# If a request with this ID comes to the server, the server responds back with a single -# vertex picked from Modern graph. After a 2 second delay, server sends a Close WebSocket -# frame on the same connection. -SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID: 3cb39c94-9454-4398-8430-03485d08bdae - -# Server waits for 1 second, then responds with a 500 error status code -FAILED_AFTER_DELAY_REQUEST_ID: edf79c8b-1d32-4102-a5d2-a5feeca40864 - -# Server waits for 1 second then responds with a close web socket frame -CLOSE_CONNECTION_REQUEST_ID: 0150143b-00f9-48a7-a268-28142d902e18 - -# Same as CLOSE_CONNECTION_REQUEST_ID -CLOSE_CONNECTION_REQUEST_ID_2: 3c4cf18a-c7f2-4dad-b9bf-5c701eb33000 - -# If a request with this ID comes to the server, the server responds with the user agent (if any) -# that was captured during the web socket handshake. -USER_AGENT_REQUEST_ID: 20ad7bfb-4abf-d7f4-f9d3-9f1d55bee4ad - -# If a request with this ID comes to the server, the server responds with a string containing all overridden -# per request settings from the request message. String will be of the form -# "requestId=19436d9e-f8fc-4b67-8a76-deec60918424 evaluationTimeout=1234, batchSize=12, userAgent=testUserAgent" -PER_REQUEST_SETTINGS_REQUEST_ID: 19436d9e-f8fc-4b67-8a76-deec60918424 diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleSocketServer.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleTestServer.java similarity index 90% rename from gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleSocketServer.java rename to gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleTestServer.java index c6870794b5..f5e939c6b7 100644 --- a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleSocketServer.java +++ b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SimpleTestServer.java @@ -32,13 +32,13 @@ import io.netty.handler.logging.LoggingHandler; * A Simple Netty Server intended to be used as a base for gremlin test servers. * Server behavior is defined by the {@link ChannelInitializer} passed in the start method. */ -public class SimpleSocketServer { - private final SocketServerSettings settings; +public class SimpleTestServer { + private final int port; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - public SimpleSocketServer(final SocketServerSettings settings) { - this.settings = settings; + public SimpleTestServer(final int port) { + this.port = port; } public Channel start(final ChannelInitializer<SocketChannel> channelInitializer) throws InterruptedException { @@ -49,7 +49,7 @@ public class SimpleSocketServer { .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(channelInitializer); - return b.bind(settings.PORT).sync().channel(); + return b.bind(port).sync().channel(); } public void stop() { diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerConstants.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerConstants.java new file mode 100644 index 0000000000..7428911020 --- /dev/null +++ b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerConstants.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.socket.server; + +/** + * Constants for the test HTTP socket server. Gremlin string constants coordinate + * response behavior between test clients and the server. + */ +public final class SocketServerConstants { + + public static final int PORT = 45943; + + public static final String GREMLIN_SINGLE_VERTEX = "server_single_vertex"; + public static final String GREMLIN_CLOSE_CONNECTION = "server_close_connection"; + public static final String GREMLIN_VERTEX_THEN_CLOSE = "server_vertex_then_close"; + public static final String GREMLIN_FAIL_AFTER_DELAY = "server_fail_after_delay"; + public static final String GREMLIN_PARTIAL_CONTENT_CLOSE = "server_partial_content_close"; + public static final String GREMLIN_SLOW_RESPONSE = "server_slow_response"; + public static final String GREMLIN_MALFORMED_RESPONSE = "server_malformed_response"; + public static final String GREMLIN_NO_RESPONSE = "server_no_response"; + public static final String GREMLIN_EMPTY_BODY = "server_empty_body"; + + private SocketServerConstants() {} +} diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java index 863c738bf2..5f6d958cc0 100644 --- a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java +++ b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java @@ -20,22 +20,16 @@ package org.apache.tinkerpop.gremlin.socket.server; import io.netty.channel.Channel; -import java.io.IOException; -import java.nio.file.FileSystems; - /** - * A simple main class to create and run a SimpleSocketServer + * A simple main class to create and run a SimpleTestServer */ public class SocketServerRunner { - public static void main(final String[] args) throws InterruptedException, IOException { - final SocketServerSettings settings = SocketServerSettings.read(FileSystems.getDefault().getPath("conf", "test-ws-gremlin.yaml")); - final SimpleSocketServer server = new SimpleSocketServer(settings); - // TODO: add HTTP version of socketserver -// final Channel channel = server.start(new TestWSGremlinInitializer(settings)); -// while(channel.isOpen()) { -// Thread.sleep(1000); -// } + public static void main(final String[] args) throws InterruptedException { + final SimpleTestServer server = new SimpleTestServer(SocketServerConstants.PORT); + final Channel channel = server.start(new TestHttpServerInitializer()); + while (channel.isOpen()) { + Thread.sleep(1000); + } } - } diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerSettings.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerSettings.java deleted file mode 100644 index bc35551333..0000000000 --- a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerSettings.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.socket.server; - -import org.yaml.snakeyaml.LoaderOptions; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.constructor.Constructor; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Objects; -import java.util.UUID; - -/** - * Encapsulates all constants that are needed by SimpleSocketServer. UUID request id constants are used - * to coordinate custom response behavior between a test client and the server. - */ -public class SocketServerSettings { - public int PORT = 0; - - /** - * Configures which serializer will be used. Ex: "GraphBinaryV1" or "GraphSONV2" - */ - public String SERIALIZER = "GraphBinaryV1"; - /** - * If a request with this ID comes to the server, the server responds back with a single vertex picked from Modern - * graph. - */ - public UUID SINGLE_VERTEX_REQUEST_ID = null; - - /** - * If a request with this ID comes to the server, the server responds back with a single vertex picked from Modern - * graph. After a 2 second delay, server sends a Close WebSocket frame on the same connection. - */ - public UUID SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID = null; - - /** - * Server waits for 1 second, then responds with a 500 error status code - */ - public UUID FAILED_AFTER_DELAY_REQUEST_ID = null; - - /** - * Server waits for 1 second then responds with a close web socket frame - */ - public UUID CLOSE_CONNECTION_REQUEST_ID = null; - - /** - * Same as CLOSE_CONNECTION_REQUEST_ID - */ - public UUID CLOSE_CONNECTION_REQUEST_ID_2 = null; - - /** - * If a request with this ID comes to the server, the server responds with the user agent (if any) that was captured - * during the web socket handshake. - */ - public UUID USER_AGENT_REQUEST_ID = null; - - /** - * If a request with this ID comes to the server, the server responds with a string containing all overridden - * per request settings from the request message. String will be of the form - * "requestId=19436d9e-f8fc-4b67-8a76-deec60918424 evaluationTimeout=1234, batchSize=12, userAgent=testUserAgent" - */ - public UUID PER_REQUEST_SETTINGS_REQUEST_ID = null; - - public static SocketServerSettings read(final Path confFilePath) throws IOException { - return read(Files.newInputStream(confFilePath)); - } - - public static SocketServerSettings read(final InputStream confInputStream) { - Objects.requireNonNull(confInputStream); - final LoaderOptions options = new LoaderOptions(); - final Yaml yaml = new Yaml(new Constructor(SocketServerSettings.class, options)); - return yaml.load(confInputStream); - } -} diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHandlers.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHandlers.java deleted file mode 100644 index ed6cfda233..0000000000 --- a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHandlers.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.socket.server; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.util.ReferenceCountUtil; - -/** - * Class that holds the handlers that can used by the initializers in TestChannelizers. - */ -public class TestHandlers { - - /** - * Handler that will drop requests to the WebSocket path. - */ - public static class NoOpWebSocketServerHandler extends ChannelInboundHandlerAdapter { - private String websocketPath; - - public NoOpWebSocketServerHandler(String websocketPath) { - this.websocketPath = websocketPath; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if ((msg instanceof HttpRequest) && ((HttpRequest) msg).uri().endsWith(websocketPath)) { - ReferenceCountUtil.release(msg); - } - } - } -} diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpGremlinHandler.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpGremlinHandler.java new file mode 100644 index 0000000000..3ea75f2f7d --- /dev/null +++ b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpGremlinHandler.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.socket.server; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; +import org.apache.tinkerpop.gremlin.util.ser.SerTokens; +import org.apache.tinkerpop.gremlin.util.ser.SerializationException; + +import java.util.Collections; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Handles HTTP Gremlin requests for the test socket server, responding with canned results + * based on the gremlin string in the request. + */ +public class TestHttpGremlinHandler extends SimpleChannelInboundHandler<FullHttpRequest> { + + private final GraphBinaryMessageSerializerV4 serializer = new GraphBinaryMessageSerializerV4(); + private static final Graph graph = TinkerFactory.createModern(); + private static final Vertex singleVertex = graph.traversal().V().hasLabel("person").next(); + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) throws Exception { + final RequestMessage msg = serializer.deserializeBinaryRequest(request.content()); + final String gremlin = msg.getGremlin(); + + switch (gremlin) { + case SocketServerConstants.GREMLIN_SINGLE_VERTEX: + writeVertexResponse(ctx); + break; + case SocketServerConstants.GREMLIN_CLOSE_CONNECTION: + ctx.executor().schedule(() -> ctx.close(), 1, TimeUnit.SECONDS); + break; + case SocketServerConstants.GREMLIN_VERTEX_THEN_CLOSE: + writeVertexResponse(ctx); + ctx.executor().schedule(() -> ctx.close(), 2, TimeUnit.SECONDS); + break; + case SocketServerConstants.GREMLIN_FAIL_AFTER_DELAY: + ctx.executor().schedule(() -> { + try { + writeErrorResponse(ctx, INTERNAL_SERVER_ERROR, "Server error"); + } catch (SerializationException e) { + ctx.close(); + } + }, 1, TimeUnit.SECONDS); + break; + case SocketServerConstants.GREMLIN_PARTIAL_CONTENT_CLOSE: + writePartialContentClose(ctx); + break; + case SocketServerConstants.GREMLIN_MALFORMED_RESPONSE: + writeMalformedResponse(ctx); + break; + case SocketServerConstants.GREMLIN_NO_RESPONSE: + // Do nothing — let the client timeout. + break; + case SocketServerConstants.GREMLIN_SLOW_RESPONSE: + writeSlowResponse(ctx); + break; + case SocketServerConstants.GREMLIN_EMPTY_BODY: + writeEmptyBody(ctx); + break; + default: + writeErrorResponse(ctx, BAD_REQUEST, "Unknown test scenario"); + break; + } + } + + private void writeVertexResponse(final ChannelHandlerContext ctx) throws SerializationException { + final ResponseMessage responseMessage = ResponseMessage.build() + .result(Collections.singletonList(singleVertex)) + .code(OK) + .create(); + final ByteBuf serialized = serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); + + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + ctx.write(new DefaultHttpContent(serialized)); + ctx.writeAndFlush(new DefaultLastHttpContent()); + } + + private void writeErrorResponse(final ChannelHandlerContext ctx, final HttpResponseStatus status, + final String message) throws SerializationException { + final ResponseMessage responseMessage = ResponseMessage.build() + .result(Collections.emptyList()) + .code(status) + .statusMessage(message) + .create(); + final ByteBuf serialized = serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); + + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + ctx.write(new DefaultHttpContent(serialized)); + ctx.writeAndFlush(new DefaultLastHttpContent()); + } + + private void writePartialContentClose(final ChannelHandlerContext ctx) throws SerializationException { + final ResponseMessage responseMessage = ResponseMessage.build() + .result(Collections.singletonList(singleVertex)) + .create(); + + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + ctx.writeAndFlush(new DefaultHttpContent(serializer.writeHeader(responseMessage, ctx.alloc()))); + ctx.close(); + } + + private void writeMalformedResponse(final ChannelHandlerContext ctx) { + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + + final byte[] garbage = new byte[64]; + new Random().nextBytes(garbage); + ctx.write(new DefaultHttpContent(ctx.alloc().buffer(64).writeBytes(garbage))); + ctx.writeAndFlush(new DefaultLastHttpContent()); + } + + private void writeSlowResponse(final ChannelHandlerContext ctx) { + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.writeAndFlush(response); + + final ResponseMessage headerMsg = ResponseMessage.build() + .result(Collections.singletonList(singleVertex)) + .create(); + + ctx.executor().schedule(() -> { + try { + ctx.writeAndFlush(new DefaultHttpContent(serializer.writeHeader(headerMsg, ctx.alloc()))); + } catch (SerializationException e) { + ctx.close(); + } + }, 500, TimeUnit.MILLISECONDS); + + ctx.executor().schedule(() -> { + try { + ctx.writeAndFlush(new DefaultHttpContent( + serializer.writeChunk(Collections.singletonList(singleVertex), ctx.alloc()))); + } catch (SerializationException e) { + ctx.close(); + } + }, 1000, TimeUnit.MILLISECONDS); + + ctx.executor().schedule(() -> { + try { + final ResponseMessage footerMsg = ResponseMessage.build() + .result(Collections.singletonList(singleVertex)) + .code(OK) + .create(); + ctx.write(new DefaultHttpContent(serializer.writeFooter(footerMsg, ctx.alloc()))); + ctx.writeAndFlush(new DefaultLastHttpContent()); + } catch (SerializationException e) { + ctx.close(); + } + }, 1500, TimeUnit.MILLISECONDS); + } + + private void writeEmptyBody(final ChannelHandlerContext ctx) { + final DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + ctx.writeAndFlush(new DefaultLastHttpContent()); + } +} diff --git a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpServerInitializer.java similarity index 54% copy from gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java copy to gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpServerInitializer.java index 863c738bf2..249e19a53a 100644 --- a/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/SocketServerRunner.java +++ b/gremlin-tools/gremlin-socket-server/src/main/java/org/apache/tinkerpop/gremlin/socket/server/TestHttpServerInitializer.java @@ -18,24 +18,20 @@ */ package org.apache.tinkerpop.gremlin.socket.server; -import io.netty.channel.Channel; - -import java.io.IOException; -import java.nio.file.FileSystems; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; /** - * A simple main class to create and run a SimpleSocketServer + * Initializes the Netty pipeline for the test HTTP Gremlin server. */ -public class SocketServerRunner { +public class TestHttpServerInitializer extends ChannelInitializer<SocketChannel> { - public static void main(final String[] args) throws InterruptedException, IOException { - final SocketServerSettings settings = SocketServerSettings.read(FileSystems.getDefault().getPath("conf", "test-ws-gremlin.yaml")); - final SimpleSocketServer server = new SimpleSocketServer(settings); - // TODO: add HTTP version of socketserver -// final Channel channel = server.start(new TestWSGremlinInitializer(settings)); -// while(channel.isOpen()) { -// Thread.sleep(1000); -// } + @Override + protected void initChannel(final SocketChannel ch) { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(new TestHttpGremlinHandler()); } - }
