This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch jstream2 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e0acc99a48b34098f0294269c4cd2cf05d74a755 Author: Ken Hu <[email protected]> AuthorDate: Wed May 13 19:35:27 2026 -0700 Simplify executor lifecycle to per connection instead of shared on Cluster. --- .../tinkerpop/gremlin/driver/Channelizer.java | 2 +- .../apache/tinkerpop/gremlin/driver/Cluster.java | 18 ------------- .../handler/HttpStreamingResponseHandler.java | 31 +++++++++++++--------- .../handler/HttpStreamingResponseHandlerTest.java | 2 +- 4 files changed, 21 insertions(+), 32 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 59b9d02365..7e297be462 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -212,7 +212,7 @@ public interface Channelizer extends ChannelHandler { final GraphBinaryReader graphBinaryReader = ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); streamingResponseHandler = new HttpStreamingResponseHandler( - graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + graphBinaryReader, pending, cluster.getMaxResponseContentLength()); } else { useStreaming = false; gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index d4a3967f02..cac193814c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -70,11 +70,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -386,10 +383,6 @@ public final class Cluster { return manager.connectionScheduler; } - ExecutorService streamingReaderPool() { - return manager.streamingReaderPool; - } - Settings.ConnectionPoolSettings connectionPoolSettings() { return manager.connectionPoolSettings; } @@ -963,12 +956,6 @@ public final class Cluster { */ private final ScheduledThreadPoolExecutor connectionScheduler; - /** - * Cached thread pool for streaming response reader threads. One thread per active streaming response, - * bounded implicitly by the connection pool size. - */ - private final ExecutorService streamingReaderPool; - private final int nioPoolSize; private final int workerPoolSize; private final int port; @@ -1036,10 +1023,6 @@ public final class Cluster { this.connectionScheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); - this.streamingReaderPool = new ThreadPoolExecutor(0, builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4, - 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), - new BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build()); - validationRequest = () -> RequestMessage.build(builder.validationRequest); } @@ -1150,7 +1133,6 @@ public final class Cluster { executor.shutdown(); hostScheduler.shutdown(); connectionScheduler.shutdown(); - streamingReaderPool.shutdownNow(); closeIt.complete(null); }); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index e9d81b46b2..ea0d897aac 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -39,12 +39,16 @@ import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.util.ser.SerTokens; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; @@ -65,9 +69,14 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb private final GraphBinaryReader graphBinaryReader; private final AtomicReference<ResultSet> pendingResultSet; - private final ExecutorService readerPool; private final long maxResponseContentLength; + // Per-connection executor for the reader thread. Sized to one thread because an HTTP/1.1 + // connection has at most one in-flight request. The thread is created lazily on first use + // and expires after 60 seconds idle, avoiding a parked thread on quiet connections. + // When HTTP/2 multiplexing is added, this can grow to match MAX_CONCURRENT_STREAMS. + private final ExecutorService readerExecutor; + // Mutable state below is accessed exclusively from the channel's event loop thread. private HttpResponseStatus responseStatus; private String contentType; @@ -77,12 +86,14 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, final AtomicReference<ResultSet> pendingResultSet, - final ExecutorService readerPool, final long maxResponseContentLength) { this.graphBinaryReader = graphBinaryReader; this.pendingResultSet = pendingResultSet; - this.readerPool = readerPool; this.maxResponseContentLength = maxResponseContentLength; + this.readerExecutor = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new BasicThreadFactory.Builder().namingPattern( + "gremlin-driver-stream-reader-" + UUID.randomUUID().toString().substring(0, 8)).build()); } @Override @@ -106,14 +117,7 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb final InputStreamBuffer buffer = new InputStreamBuffer(queueInputStream); final GraphBinaryStreamResponseReader streamReader = new GraphBinaryStreamResponseReader(buffer, graphBinaryReader, rs, pendingResultSet); - try { - readerPool.submit(streamReader::run); - } catch (RejectedExecutionException e) { - queueInputStream.signalEndOfStream(); - rs.markError(e); - pendingResultSet.compareAndSet(rs, null); - out.add(LAST_CONTENT_READ_RESPONSE); - } + readerExecutor.submit(streamReader::run); } else { // No pending ResultSet — close the stream and fire sentinel immediately queueInputStream.signalEndOfStream(); @@ -175,6 +179,9 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb queueInputStream.signalEndOfStream(); } releaseErrorBody(); + // shutdownNow() interrupts the reader thread if it's blocked waiting for data, + // ensuring it doesn't linger after the connection is gone. + readerExecutor.shutdownNow(); super.channelInactive(ctx); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java index fdd37818a4..2ca5dc0171 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -69,7 +69,7 @@ public class HttpStreamingResponseHandlerTest { private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength) { final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( - reader, pendingResultSet, executor, maxResponseContentLength); + reader, pendingResultSet, maxResponseContentLength); return new EmbeddedChannel(handler); }
