This is an automated email from the ASF dual-hosted git repository. Cole-Greer pushed a commit to branch javaTCPBackpressure in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit c185ba86c8b484198d71068057e73d7a5eedae8c Author: Cole Greer <[email protected]> AuthorDate: Mon May 25 13:45:20 2026 -0700 Add streaming backpressure via bounded queue in gremlin-driver Bound the ByteBufQueueInputStream queue and use Netty's setAutoRead(false) to apply TCP backpressure when the client cannot keep up with the server's streaming rate. When the queue is full, the handler pauses channel reads and blocks briefly to enqueue the current chunk. The consumer resumes reads when the queue drains below half capacity via a callback dispatched to the event loop. The buffer size defaults to 256 chunks and is configurable via Cluster.Builder.streamBufferSize(). --- .../tinkerpop/gremlin/driver/Channelizer.java | 2 +- .../apache/tinkerpop/gremlin/driver/Cluster.java | 19 +++++++ .../tinkerpop/gremlin/driver/Connection.java | 1 + .../apache/tinkerpop/gremlin/driver/Settings.java | 8 +++ .../handler/HttpStreamingResponseHandler.java | 22 ++++++-- .../driver/stream/ByteBufQueueInputStream.java | 62 +++++++++++++++++++--- .../handler/ByteBufQueueInputStreamTest.java | 30 +++++++++++ .../handler/HttpStreamingResponseHandlerTest.java | 8 ++- 8 files changed, 141 insertions(+), 11 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 59b9d02365..4d23aa3561 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -212,7 +212,7 @@ public interface Channelizer extends ChannelHandler { final GraphBinaryReader graphBinaryReader = ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); streamingResponseHandler = new HttpStreamingResponseHandler( - graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength(), cluster.getStreamBufferSize()); } else { useStreaming = false; gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index d4a3967f02..40644df4d6 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -173,6 +173,7 @@ public final class Cluster { .reconnectInterval(settings.connectionPool.reconnectInterval) .resultIterationBatchSize(settings.connectionPool.resultIterationBatchSize) .maxResponseContentLength(settings.connectionPool.maxResponseContentLength) + .streamBufferSize(settings.connectionPool.streamBufferSize) .maxWaitForConnection(settings.connectionPool.maxWaitForConnection) .maxConnectionPoolSize(settings.connectionPool.maxSize) .connectionSetupTimeoutMillis(settings.connectionPool.connectionSetupTimeoutMillis) @@ -326,6 +327,13 @@ public final class Cluster { return manager.connectionPoolSettings.maxResponseContentLength; } + /** + * Gets the size of the buffer used for streaming responses. + */ + public int getStreamBufferSize() { + return manager.connectionPoolSettings.streamBufferSize; + } + /** * Get time in milliseconds that the driver will allow a channel to not receive read or writes before it automatically closes. */ @@ -495,6 +503,7 @@ public final class Cluster { private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION; private int maxWaitForClose = Connection.MAX_WAIT_FOR_CLOSE; private long maxResponseContentLength = Connection.MAX_RESPONSE_CONTENT_LENGTH; + private int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE; private int reconnectInterval = Connection.RECONNECT_INTERVAL; private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE; private boolean enableSsl = false; @@ -723,6 +732,15 @@ public final class Cluster { return this; } + /** + * The size of the buffer used for streaming responses. + */ + public Builder streamBufferSize(final int streamBufferSize) { + if (streamBufferSize < 1) throw new IllegalArgumentException("streamBufferSize must be at least 1"); + this.streamBufferSize = streamBufferSize; + return this; + } + /** * Specify a valid Gremlin script that can be used to test remote operations. This script should be designed * to return quickly with the least amount of overhead possible. By default, the script sends an empty string. @@ -994,6 +1012,7 @@ public final class Cluster { connectionPoolSettings.maxWaitForConnection = builder.maxWaitForConnection; connectionPoolSettings.maxWaitForClose = builder.maxWaitForClose; connectionPoolSettings.maxResponseContentLength = builder.maxResponseContentLength; + connectionPoolSettings.streamBufferSize = builder.streamBufferSize; connectionPoolSettings.reconnectInterval = builder.reconnectInterval; connectionPoolSettings.resultIterationBatchSize = builder.resultIterationBatchSize; connectionPoolSettings.enableSsl = builder.enableSsl; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 12a1de646c..8228c5b032 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -47,6 +47,7 @@ final class Connection { public static final int MAX_WAIT_FOR_CONNECTION = 16000; public static final int MAX_WAIT_FOR_CLOSE = 3000; public static final long MAX_RESPONSE_CONTENT_LENGTH = Integer.MAX_VALUE; + public static final int DEFAULT_STREAM_BUFFER_SIZE = 256; public static final int RECONNECT_INTERVAL = 1000; public static final int RESULT_ITERATION_BATCH_SIZE = 64; public static final long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java index 907909c71a..b204f8181a 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java @@ -202,6 +202,9 @@ public final class Settings { if (connectionPoolConf.containsKey("maxResponseContentLength")) cpSettings.maxResponseContentLength = connectionPoolConf.getInt("maxResponseContentLength"); + if (connectionPoolConf.containsKey("streamBufferSize")) + cpSettings.streamBufferSize = connectionPoolConf.getInt("streamBufferSize"); + if (connectionPoolConf.containsKey("reconnectInterval")) cpSettings.reconnectInterval = connectionPoolConf.getInt("reconnectInterval"); @@ -324,6 +327,11 @@ public final class Settings { */ public long maxResponseContentLength = Connection.MAX_RESPONSE_CONTENT_LENGTH; + /** + * The size of the buffer used for streaming responses. The default value is 256. + */ + public int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE; + /** * The amount of time in milliseconds to wait before trying to reconnect to a dead host. The default value is * 1000. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 607fd6bdf9..c2968b72b9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -67,6 +67,7 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb private final AtomicReference<ResultSet> pendingResultSet; private final ExecutorService readerPool; private final long maxResponseContentLength; + private final int streamBufferSize; // Mutable state below is accessed exclusively from the channel's event loop thread. private HttpResponseStatus responseStatus; @@ -78,11 +79,13 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, final AtomicReference<ResultSet> pendingResultSet, final ExecutorService readerPool, - final long maxResponseContentLength) { + final long maxResponseContentLength, + final int streamBufferSize) { this.graphBinaryReader = graphBinaryReader; this.pendingResultSet = pendingResultSet; this.readerPool = readerPool; this.maxResponseContentLength = maxResponseContentLength; + this.streamBufferSize = streamBufferSize; } @Override @@ -97,7 +100,9 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb responseStatus = resp.status(); contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); - queueInputStream = new ByteBufQueueInputStream(); + final io.netty.channel.Channel channel = ctx.channel(); + queueInputStream = new ByteBufQueueInputStream(streamBufferSize, () -> + channel.eventLoop().execute(() -> channel.config().setAutoRead(true))); // Spawn reader thread for GraphBinary responses if (isGraphBinaryResponse()) { @@ -147,7 +152,18 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb } else if (content.readableBytes() > 0 && queueInputStream != null) { // Feed bytes to the reader thread // retain() because Netty releases the content ByteBuf after decode() returns - queueInputStream.offer(content.retain()); + final ByteBuf buf = content.retain(); + if (!queueInputStream.offer(buf)) { + ctx.channel().config().setAutoRead(false); + queueInputStream.markPaused(); + try { + queueInputStream.putBlocking(buf); + } catch (InterruptedException e) { + buf.release(); + ctx.channel().config().setAutoRead(true); + Thread.currentThread().interrupt(); + } + } } if (msg instanceof LastHttpContent) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java index 757b2821cb..a613c7b13b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java @@ -37,33 +37,70 @@ public class ByteBufQueueInputStream extends InputStream { private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0); private final BlockingQueue<ByteBuf> queue; + private final int capacity; + private final Runnable onSpaceAvailable; private ByteBuf current; private volatile boolean eof; + private volatile boolean readsPaused; public ByteBufQueueInputStream() { - this.queue = new LinkedBlockingQueue<>(); + this(Integer.MAX_VALUE, () -> {}); + } + + public ByteBufQueueInputStream(final int capacity, final Runnable onSpaceAvailable) { + this.queue = new LinkedBlockingQueue<>(capacity); + this.capacity = capacity; + this.onSpaceAvailable = onSpaceAvailable; } /** * Offer a ByteBuf to the queue. The caller must have already retained the ByteBuf if needed. * The ByteBuf will be released after it is fully read. If the stream is already closed, * the buffer is released immediately. + * + * @return true if the buffer was accepted, false if the queue is full. + */ + public boolean offer(final ByteBuf buf) { + if (eof) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + return true; + } + return queue.offer(buf); + } + + /** + * Blocking put for when the queue is full. The caller should pause reads before calling this + * to avoid blocking the event loop indefinitely. */ - public void offer(final ByteBuf buf) { + public void putBlocking(final ByteBuf buf) throws InterruptedException { if (eof) { if (buf != END_OF_STREAM && buf.refCnt() > 0) { buf.release(); } return; } - queue.add(buf); + queue.put(buf); + } + + /** + * Mark that the producer has paused reads due to backpressure. + */ + public void markPaused() { + readsPaused = true; } /** * Signal that no more ByteBufs will be offered. */ public void signalEndOfStream() { - queue.offer(END_OF_STREAM); + try { + queue.put(END_OF_STREAM); + } catch (InterruptedException e) { + eof = true; + Thread.currentThread().interrupt(); + } } @Override @@ -78,12 +115,15 @@ public class ByteBufQueueInputStream extends InputStream { Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } - if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == null) { + throw new IOException("Timed out waiting for streaming response data"); + } if (current == END_OF_STREAM) { eof = true; current = null; return -1; } + checkBackpressure(); } return current.readByte() & 0xFF; } @@ -102,12 +142,15 @@ public class ByteBufQueueInputStream extends InputStream { Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } - if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == null) { + throw new IOException("Timed out waiting for streaming response data"); + } if (current == END_OF_STREAM) { eof = true; current = null; return -1; } + checkBackpressure(); } final int readable = Math.min(current.readableBytes(), len); current.readBytes(b, off, readable); @@ -127,6 +170,13 @@ public class ByteBufQueueInputStream extends InputStream { } } + private void checkBackpressure() { + if (readsPaused && queue.size() < Math.max(1, capacity / 2)) { + readsPaused = false; + onSpaceAvailable.run(); + } + } + private void releaseCurrent() { if (current != null && current != END_OF_STREAM && current.refCnt() > 0) { current.release(); diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java index 91ba9cf28a..9c0378b6d5 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java @@ -24,6 +24,8 @@ import io.netty.buffer.Unpooled; import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; import org.junit.Test; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.Assert.*; public class ByteBufQueueInputStreamTest { @@ -77,6 +79,34 @@ public class ByteBufQueueInputStreamTest { assertEquals(0, buf.refCnt()); } + @Test + public void shouldReturnFalseWhenQueueFull() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, () -> {}); + assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{1}))); + assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{2}))); + assertFalse(stream.offer(Unpooled.wrappedBuffer(new byte[]{3}))); + stream.close(); + } + + @Test + public void shouldInvokeCallbackWhenDrainedBelowThreshold() throws Exception { + final AtomicBoolean callbackInvoked = new AtomicBoolean(false); + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, () -> callbackInvoked.set(true)); + + stream.offer(Unpooled.wrappedBuffer(new byte[]{1})); + stream.offer(Unpooled.wrappedBuffer(new byte[]{2})); + stream.markPaused(); + + // First read consumes the first buf (queue size goes to 1, not below capacity/2=1 yet) + assertEquals(1, stream.read()); + assertFalse(callbackInvoked.get()); + + // Second read polls the second buf (queue size goes to 0, below threshold) triggering callback + assertEquals(2, stream.read()); + assertTrue(callbackInvoked.get()); + stream.close(); + } + @Test public void shouldCleanUpOnClose() throws Exception { final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java index fdd37818a4..a9b61ee007 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; @@ -68,8 +69,12 @@ public class HttpStreamingResponseHandlerTest { } private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength) { + return createChannel(pendingResultSet, maxResponseContentLength, 256); + } + + private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength, final int streamBufferSize) { final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( - reader, pendingResultSet, executor, maxResponseContentLength); + reader, pendingResultSet, executor, maxResponseContentLength, streamBufferSize); return new EmbeddedChannel(handler); } @@ -198,6 +203,7 @@ public class HttpStreamingResponseHandlerTest { channel.finishAndReleaseAll(); } + private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { final byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes);
