This is an automated email from the ASF dual-hosted git repository.

Cole-Greer pushed a commit to branch javaTCPBackpressure
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit c185ba86c8b484198d71068057e73d7a5eedae8c
Author: Cole Greer <[email protected]>
AuthorDate: Mon May 25 13:45:20 2026 -0700

    Add streaming backpressure via bounded queue in gremlin-driver
    
    Bound the ByteBufQueueInputStream queue and use Netty's setAutoRead(false)
    to apply TCP backpressure when the client cannot keep up with the server's
    streaming rate.
    
    When the queue is full, the handler pauses channel reads and blocks briefly
    to enqueue the current chunk. The consumer resumes reads when the queue
    drains below half capacity via a callback dispatched to the event loop.
    
    The buffer size defaults to 256 chunks and is configurable via
    Cluster.Builder.streamBufferSize().
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |  2 +-
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 19 +++++++
 .../tinkerpop/gremlin/driver/Connection.java       |  1 +
 .../apache/tinkerpop/gremlin/driver/Settings.java  |  8 +++
 .../handler/HttpStreamingResponseHandler.java      | 22 ++++++--
 .../driver/stream/ByteBufQueueInputStream.java     | 62 +++++++++++++++++++---
 .../handler/ByteBufQueueInputStreamTest.java       | 30 +++++++++++
 .../handler/HttpStreamingResponseHandlerTest.java  |  8 ++-
 8 files changed, 141 insertions(+), 11 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 59b9d02365..4d23aa3561 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -212,7 +212,7 @@ public interface Channelizer extends ChannelHandler {
                 final GraphBinaryReader graphBinaryReader =
                         ((GraphBinaryMessageSerializerV4) 
serializer).getMapper().getReader();
                 streamingResponseHandler = new HttpStreamingResponseHandler(
-                        graphBinaryReader, pending, 
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength());
+                        graphBinaryReader, pending, 
cluster.streamingReaderPool(), cluster.getMaxResponseContentLength(), 
cluster.getStreamBufferSize());
             } else {
                 useStreaming = false;
                 gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(serializer);
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index d4a3967f02..40644df4d6 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -173,6 +173,7 @@ public final class Cluster {
                 .reconnectInterval(settings.connectionPool.reconnectInterval)
                 
.resultIterationBatchSize(settings.connectionPool.resultIterationBatchSize)
                 
.maxResponseContentLength(settings.connectionPool.maxResponseContentLength)
+                .streamBufferSize(settings.connectionPool.streamBufferSize)
                 
.maxWaitForConnection(settings.connectionPool.maxWaitForConnection)
                 .maxConnectionPoolSize(settings.connectionPool.maxSize)
                 
.connectionSetupTimeoutMillis(settings.connectionPool.connectionSetupTimeoutMillis)
@@ -326,6 +327,13 @@ public final class Cluster {
         return manager.connectionPoolSettings.maxResponseContentLength;
     }
 
+    /**
+     * Gets the size of the buffer used for streaming responses.
+     */
+    public int getStreamBufferSize() {
+        return manager.connectionPoolSettings.streamBufferSize;
+    }
+
     /**
      * Get time in milliseconds that the driver will allow a channel to not 
receive read or writes before it automatically closes.
      */
@@ -495,6 +503,7 @@ public final class Cluster {
         private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
         private int maxWaitForClose = Connection.MAX_WAIT_FOR_CLOSE;
         private long maxResponseContentLength = 
Connection.MAX_RESPONSE_CONTENT_LENGTH;
+        private int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE;
         private int reconnectInterval = Connection.RECONNECT_INTERVAL;
         private int resultIterationBatchSize = 
Connection.RESULT_ITERATION_BATCH_SIZE;
         private boolean enableSsl = false;
@@ -723,6 +732,15 @@ public final class Cluster {
             return this;
         }
 
+        /**
+         * The size of the buffer used for streaming responses.
+         */
+        public Builder streamBufferSize(final int streamBufferSize) {
+            if (streamBufferSize < 1) throw new 
IllegalArgumentException("streamBufferSize must be at least 1");
+            this.streamBufferSize = streamBufferSize;
+            return this;
+        }
+
         /**
          * Specify a valid Gremlin script that can be used to test remote 
operations. This script should be designed
          * to return quickly with the least amount of overhead possible. By 
default, the script sends an empty string.
@@ -994,6 +1012,7 @@ public final class Cluster {
             connectionPoolSettings.maxWaitForConnection = 
builder.maxWaitForConnection;
             connectionPoolSettings.maxWaitForClose = builder.maxWaitForClose;
             connectionPoolSettings.maxResponseContentLength = 
builder.maxResponseContentLength;
+            connectionPoolSettings.streamBufferSize = builder.streamBufferSize;
             connectionPoolSettings.reconnectInterval = 
builder.reconnectInterval;
             connectionPoolSettings.resultIterationBatchSize = 
builder.resultIterationBatchSize;
             connectionPoolSettings.enableSsl = builder.enableSsl;
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 12a1de646c..8228c5b032 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -47,6 +47,7 @@ final class Connection {
     public static final int MAX_WAIT_FOR_CONNECTION = 16000;
     public static final int MAX_WAIT_FOR_CLOSE = 3000;
     public static final long MAX_RESPONSE_CONTENT_LENGTH = Integer.MAX_VALUE;
+    public static final int DEFAULT_STREAM_BUFFER_SIZE = 256;
     public static final int RECONNECT_INTERVAL = 1000;
     public static final int RESULT_ITERATION_BATCH_SIZE = 64;
     public static final long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000;
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 907909c71a..b204f8181a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -202,6 +202,9 @@ public final class Settings {
             if (connectionPoolConf.containsKey("maxResponseContentLength"))
                 cpSettings.maxResponseContentLength = 
connectionPoolConf.getInt("maxResponseContentLength");
 
+            if (connectionPoolConf.containsKey("streamBufferSize"))
+                cpSettings.streamBufferSize = 
connectionPoolConf.getInt("streamBufferSize");
+
             if (connectionPoolConf.containsKey("reconnectInterval"))
                 cpSettings.reconnectInterval = 
connectionPoolConf.getInt("reconnectInterval");
 
@@ -324,6 +327,11 @@ public final class Settings {
          */
         public long maxResponseContentLength = 
Connection.MAX_RESPONSE_CONTENT_LENGTH;
 
+        /**
+         * The size of the buffer used for streaming responses. The default 
value is 256.
+         */
+        public int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE;
+
         /**
          * The amount of time in milliseconds to wait before trying to 
reconnect to a dead host. The default value is
          * 1000.
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
index 607fd6bdf9..c2968b72b9 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -67,6 +67,7 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
     private final AtomicReference<ResultSet> pendingResultSet;
     private final ExecutorService readerPool;
     private final long maxResponseContentLength;
+    private final int streamBufferSize;
 
     // Mutable state below is accessed exclusively from the channel's event 
loop thread.
     private HttpResponseStatus responseStatus;
@@ -78,11 +79,13 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
     public HttpStreamingResponseHandler(final GraphBinaryReader 
graphBinaryReader,
                                         final AtomicReference<ResultSet> 
pendingResultSet,
                                         final ExecutorService readerPool,
-                                        final long maxResponseContentLength) {
+                                        final long maxResponseContentLength,
+                                        final int streamBufferSize) {
         this.graphBinaryReader = graphBinaryReader;
         this.pendingResultSet = pendingResultSet;
         this.readerPool = readerPool;
         this.maxResponseContentLength = maxResponseContentLength;
+        this.streamBufferSize = streamBufferSize;
     }
 
     @Override
@@ -97,7 +100,9 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
 
             responseStatus = resp.status();
             contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE);
-            queueInputStream = new ByteBufQueueInputStream();
+            final io.netty.channel.Channel channel = ctx.channel();
+            queueInputStream = new ByteBufQueueInputStream(streamBufferSize, 
() ->
+                    channel.eventLoop().execute(() -> 
channel.config().setAutoRead(true)));
 
             // Spawn reader thread for GraphBinary responses
             if (isGraphBinaryResponse()) {
@@ -147,7 +152,18 @@ public class HttpStreamingResponseHandler extends 
MessageToMessageDecoder<HttpOb
             } else if (content.readableBytes() > 0 && queueInputStream != 
null) {
                 // Feed bytes to the reader thread
                 // retain() because Netty releases the content ByteBuf after 
decode() returns
-                queueInputStream.offer(content.retain());
+                final ByteBuf buf = content.retain();
+                if (!queueInputStream.offer(buf)) {
+                    ctx.channel().config().setAutoRead(false);
+                    queueInputStream.markPaused();
+                    try {
+                        queueInputStream.putBlocking(buf);
+                    } catch (InterruptedException e) {
+                        buf.release();
+                        ctx.channel().config().setAutoRead(true);
+                        Thread.currentThread().interrupt();
+                    }
+                }
             }
 
             if (msg instanceof LastHttpContent) {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
index 757b2821cb..a613c7b13b 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
@@ -37,33 +37,70 @@ public class ByteBufQueueInputStream extends InputStream {
     private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0);
 
     private final BlockingQueue<ByteBuf> queue;
+    private final int capacity;
+    private final Runnable onSpaceAvailable;
     private ByteBuf current;
     private volatile boolean eof;
+    private volatile boolean readsPaused;
 
     public ByteBufQueueInputStream() {
-        this.queue = new LinkedBlockingQueue<>();
+        this(Integer.MAX_VALUE, () -> {});
+    }
+
+    public ByteBufQueueInputStream(final int capacity, final Runnable 
onSpaceAvailable) {
+        this.queue = new LinkedBlockingQueue<>(capacity);
+        this.capacity = capacity;
+        this.onSpaceAvailable = onSpaceAvailable;
     }
 
     /**
      * Offer a ByteBuf to the queue. The caller must have already retained the 
ByteBuf if needed.
      * The ByteBuf will be released after it is fully read. If the stream is 
already closed,
      * the buffer is released immediately.
+     *
+     * @return true if the buffer was accepted, false if the queue is full.
+     */
+    public boolean offer(final ByteBuf buf) {
+        if (eof) {
+            if (buf != END_OF_STREAM && buf.refCnt() > 0) {
+                buf.release();
+            }
+            return true;
+        }
+        return queue.offer(buf);
+    }
+
+    /**
+     * Blocking put for when the queue is full. The caller should pause reads 
before calling this
+     * to avoid blocking the event loop indefinitely.
      */
-    public void offer(final ByteBuf buf) {
+    public void putBlocking(final ByteBuf buf) throws InterruptedException {
         if (eof) {
             if (buf != END_OF_STREAM && buf.refCnt() > 0) {
                 buf.release();
             }
             return;
         }
-        queue.add(buf);
+        queue.put(buf);
+    }
+
+    /**
+     * Mark that the producer has paused reads due to backpressure.
+     */
+    public void markPaused() {
+        readsPaused = true;
     }
 
     /**
      * Signal that no more ByteBufs will be offered.
      */
     public void signalEndOfStream() {
-        queue.offer(END_OF_STREAM);
+        try {
+            queue.put(END_OF_STREAM);
+        } catch (InterruptedException e) {
+            eof = true;
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
@@ -78,12 +115,15 @@ public class ByteBufQueueInputStream extends InputStream {
                 Thread.currentThread().interrupt();
                 throw new IOException("Interrupted while waiting for data", e);
             }
-            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == null) {
+                throw new IOException("Timed out waiting for streaming 
response data");
+            }
             if (current == END_OF_STREAM) {
                 eof = true;
                 current = null;
                 return -1;
             }
+            checkBackpressure();
         }
         return current.readByte() & 0xFF;
     }
@@ -102,12 +142,15 @@ public class ByteBufQueueInputStream extends InputStream {
                 Thread.currentThread().interrupt();
                 throw new IOException("Interrupted while waiting for data", e);
             }
-            if (current == null) throw new IOException("Timed out waiting for 
streaming response data");
+            if (current == null) {
+                throw new IOException("Timed out waiting for streaming 
response data");
+            }
             if (current == END_OF_STREAM) {
                 eof = true;
                 current = null;
                 return -1;
             }
+            checkBackpressure();
         }
         final int readable = Math.min(current.readableBytes(), len);
         current.readBytes(b, off, readable);
@@ -127,6 +170,13 @@ public class ByteBufQueueInputStream extends InputStream {
         }
     }
 
+    private void checkBackpressure() {
+        if (readsPaused && queue.size() < Math.max(1, capacity / 2)) {
+            readsPaused = false;
+            onSpaceAvailable.run();
+        }
+    }
+
     private void releaseCurrent() {
         if (current != null && current != END_OF_STREAM && current.refCnt() > 
0) {
             current.release();
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
index 91ba9cf28a..9c0378b6d5 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
@@ -24,6 +24,8 @@ import io.netty.buffer.Unpooled;
 import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream;
 import org.junit.Test;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.*;
 
 public class ByteBufQueueInputStreamTest {
@@ -77,6 +79,34 @@ public class ByteBufQueueInputStreamTest {
         assertEquals(0, buf.refCnt());
     }
 
+    @Test
+    public void shouldReturnFalseWhenQueueFull() throws Exception {
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, 
() -> {});
+        assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{1})));
+        assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{2})));
+        assertFalse(stream.offer(Unpooled.wrappedBuffer(new byte[]{3})));
+        stream.close();
+    }
+
+    @Test
+    public void shouldInvokeCallbackWhenDrainedBelowThreshold() throws 
Exception {
+        final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
+        final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, 
() -> callbackInvoked.set(true));
+
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{1}));
+        stream.offer(Unpooled.wrappedBuffer(new byte[]{2}));
+        stream.markPaused();
+
+        // First read consumes the first buf (queue size goes to 1, not below 
capacity/2=1 yet)
+        assertEquals(1, stream.read());
+        assertFalse(callbackInvoked.get());
+
+        // Second read polls the second buf (queue size goes to 0, below 
threshold) triggering callback
+        assertEquals(2, stream.read());
+        assertTrue(callbackInvoked.get());
+        stream.close();
+    }
+
     @Test
     public void shouldCleanUpOnClose() throws Exception {
         final ByteBufQueueInputStream stream = new ByteBufQueueInputStream();
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
index fdd37818a4..a9b61ee007 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
@@ -68,8 +69,12 @@ public class HttpStreamingResponseHandlerTest {
     }
 
     private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength) {
+        return createChannel(pendingResultSet, maxResponseContentLength, 256);
+    }
+
+    private EmbeddedChannel createChannel(final AtomicReference<ResultSet> 
pendingResultSet, final long maxResponseContentLength, final int 
streamBufferSize) {
         final HttpStreamingResponseHandler handler = new 
HttpStreamingResponseHandler(
-                reader, pendingResultSet, executor, maxResponseContentLength);
+                reader, pendingResultSet, executor, maxResponseContentLength, 
streamBufferSize);
         return new EmbeddedChannel(handler);
     }
 
@@ -198,6 +203,7 @@ public class HttpStreamingResponseHandlerTest {
         channel.finishAndReleaseAll();
     }
 
+
     private byte[] toBytes(final io.netty.buffer.ByteBuf buf) {
         final byte[] bytes = new byte[buf.readableBytes()];
         buf.readBytes(bytes);

Reply via email to