This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch jstream2
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e0acc99a48b34098f0294269c4cd2cf05d74a755
Author: Ken Hu <[email protected]>
AuthorDate: Wed May 13 19:35:27 2026 -0700

    Simplify executor lifecycle to per connection instead of shared on Cluster.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |  2 +-
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 18 -------------
 .../handler/HttpStreamingResponseHandler.java      | 31 +++++++++++++---------
 .../handler/HttpStreamingResponseHandlerTest.java  |  2 +-
 4 files changed, 21 insertions(+), 32 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 59b9d02365..7e297be462 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -212,7 +212,7 @@ public interface Channelizer extends ChannelHandler {
                 final GraphBinaryReader graphBinaryReader =
                         ((GraphBinaryMessageSerializerV4) 
serializer).getMapper().getReader();
                 streamingResponseHandler = new HttpStreamingResponseHandler(
-                        graphBinaryReader, pending, 
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
+                        graphBinaryReader, pending, 
cluster.getMaxResponseContentLength());
             } else {
                 useStreaming = false;
                 gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(serializer);
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index d4a3967f02..cac193814c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -70,11 +70,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -386,10 +383,6 @@ public final class Cluster {
         return manager.connectionScheduler;
     }
 
-    ExecutorService streamingReaderPool() {
-        return manager.streamingReaderPool;
-    }
-
     Settings.ConnectionPoolSettings connectionPoolSettings() {
         return manager.connectionPoolSettings;
     }
@@ -963,12 +956,6 @@ public final class Cluster {
          */
         private final ScheduledThreadPoolExecutor connectionScheduler;
 
-        /**
-         * Cached thread pool for streaming response reader threads. One 
thread per active streaming response,
-         * bounded implicitly by the connection pool size.
-         */
-        private final ExecutorService streamingReaderPool;
-
         private final int nioPoolSize;
         private final int workerPoolSize;
         private final int port;
@@ -1036,10 +1023,6 @@ public final class Cluster {
             this.connectionScheduler = new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                     new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
 
-            this.streamingReaderPool = new ThreadPoolExecutor(0, 
builder.maxConnectionPoolSize * Math.max(contactPoints.size(), 1) * 4,
-                    60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
-                    new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-stream-reader-%d").build());
-
             validationRequest = () -> 
RequestMessage.build(builder.validationRequest);
         }
 
@@ -1150,7 +1133,6 @@ public final class Cluster {
                 executor.shutdown();
                 hostScheduler.shutdown();
                 connectionScheduler.shutdown();
-                streamingReaderPool.shutdownNow();
                 closeIt.complete(null);
             });
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
index e9d81b46b2..ea0d897aac 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -39,12 +39,16 @@ import 
org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
@@ -65,9 +69,14 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
 
     private final GraphBinaryReader graphBinaryReader;
     private final AtomicReference<ResultSet> pendingResultSet;
-    private final ExecutorService readerPool;
     private final long maxResponseContentLength;
 
+    // Per-connection executor for the reader thread. Sized to one thread 
because an HTTP/1.1
+    // connection has at most one in-flight request. The thread is created 
lazily on first use
+    // and expires after 60 seconds idle, avoiding a parked thread on quiet 
connections.
+    // When HTTP/2 multiplexing is added, this can grow to match 
MAX_CONCURRENT_STREAMS.
+    private final ExecutorService readerExecutor;
+
     // Mutable state below is accessed exclusively from the channel's event 
loop thread.
     private HttpResponseStatus responseStatus;
     private String contentType;
@@ -77,12 +86,14 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
 
     public HttpStreamingResponseHandler(final GraphBinaryReader 
graphBinaryReader,
                                         final AtomicReference<ResultSet> 
pendingResultSet,
-                                        final ExecutorService readerPool,
                                         final long maxResponseContentLength) {
         this.graphBinaryReader = graphBinaryReader;
         this.pendingResultSet = pendingResultSet;
-        this.readerPool = readerPool;
         this.maxResponseContentLength = maxResponseContentLength;
+        this.readerExecutor = new ThreadPoolExecutor(0, 1, 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new BasicThreadFactory.Builder().namingPattern(
+                        "gremlin-driver-stream-reader-" + 
UUID.randomUUID().toString().substring(0, 8)).build());
     }
 
     @Override
@@ -106,14 +117,7 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
                     final InputStreamBuffer buffer = new 
InputStreamBuffer(queueInputStream);
                     final GraphBinaryStreamResponseReader streamReader =
                             new GraphBinaryStreamResponseReader(buffer, 
graphBinaryReader, rs, pendingResultSet);
-                    try {
-                        readerPool.submit(streamReader::run);
-                    } catch (RejectedExecutionException e) {
-                        queueInputStream.signalEndOfStream();
-                        rs.markError(e);
-                        pendingResultSet.compareAndSet(rs, null);
-                        out.add(LAST_CONTENT_READ_RESPONSE);
-                    }
+                    readerExecutor.submit(streamReader::run);
                 } else {
                     // No pending ResultSet — close the stream and fire 
sentinel immediately
                     queueInputStream.signalEndOfStream();
@@ -175,6 +179,9 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
             queueInputStream.signalEndOfStream();
         }
         releaseErrorBody();
+        // shutdownNow() interrupts the reader thread if it's blocked waiting 
for data,
+        // ensuring it doesn't linger after the connection is gone.
+        readerExecutor.shutdownNow();
         super.channelInactive(ctx);
     }
 
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
index fdd37818a4..2ca5dc0171 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -69,7 +69,7 @@ public class HttpStreamingResponseHandlerTest {
 
     private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength) {
         final HttpStreamingResponseHandler handler = new 
HttpStreamingResponseHandler(
-                reader, pendingResultSet, executor, maxResponseContentLength);
+                reader, pendingResultSet, maxResponseContentLength);
         return new EmbeddedChannel(handler);
     }
 

Reply via email to