This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch java-tx-sync in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e35801e9d1bbe6d76f5277ca5b1395c0bc897d72 Author: Ken Hu <[email protected]> AuthorDate: Mon Jun 29 12:13:51 2026 -0700 Make Java RemoteTransaction submit() wait for server response headers This is a case where implementation didn't match documented and expected behavior. Transactions need to be semi-blocking in order for them to reach the server in the exact same order. It was incorrectly assumed that the Java GLV already did this so this just changes it to actually do that. Assisted-by: Claude Code:claude-opus-4-8 --- .../apache/tinkerpop/gremlin/driver/ResultSet.java | 32 ++++++++ .../handler/HttpStreamingResponseHandler.java | 10 +++ .../driver/remote/HttpRemoteTransaction.java | 21 +++++- .../tinkerpop/gremlin/driver/ResultSetTest.java | 85 ++++++++++++++++++++++ .../handler/HttpStreamingResponseHandlerTest.java | 62 ++++++++++++++++ 5 files changed, 209 insertions(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java index b5d506bc1f..d67f16adc0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java @@ -58,6 +58,10 @@ public final class ResultSet implements Iterable<Result> { private final Queue<Pair<CompletableFuture<List<Result>>, Integer>> waiting = new ConcurrentLinkedQueue<>(); private final AtomicReference<Throwable> error = new AtomicReference<>(); private final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); + // Completes when the server's response headers have been received (i.e. the request made a full round trip and the + // server has begun responding). This is distinct from readCompleted, which only completes once the entire body has + // been read. It lets a caller (e.g. a remote transaction) block until the server has begun handling this request. + private final CompletableFuture<Void> headersReceived = new CompletableFuture<>(); private final ExecutorService executor; private final RequestMessage originalRequestMessage; @@ -94,6 +98,18 @@ public final class ResultSet implements Iterable<Result> { return readCompleted.whenCompleteAsync((s,t) -> {}, executor); } + /** + * Returns a future that completes once the server's response headers have been received - that is, once the + * request has made a full round trip to the server and the server has begun responding - without waiting for the + * entire body. Used by remote transactions to block on submission just long enough to guarantee server-side + * ordering relative to a subsequent request, while still streaming the body lazily. + */ + public CompletableFuture<Void> headersReceivedAsync() { + // completed by Netty's event loop thread; hop to the executor pool so application code attached downstream + // does not run on (and block) the event loop. Mirrors allItemsAvailableAsync(). + return headersReceived.whenCompleteAsync((s,t) -> {}, executor); + } + /** * Gets the number of items available on the client. */ @@ -196,10 +212,23 @@ public final class ResultSet implements Iterable<Result> { tryDrainNextWaiting(false); } + /** + * Marks the response headers as received. Called by the streaming decoder the moment the server's response header + * is decoded, before any body chunks are processed. Idempotent and safe to call more than once. On the + * non-streaming path there is no separate header event - the aggregator coalesces the whole response - so this is + * instead tripped as a backstop by {@link #markComplete()}/{@link #markError(Throwable)}. + */ + public void markHeadersReceived() { + this.headersReceived.complete(null); + } + /** * Marks the result stream as complete. */ public void markComplete() { + // backstop: ensure the headers future is tripped even on a path that never signaled it explicitly (e.g. the + // non-streaming aggregator path, or a NO_CONTENT response with no body). + this.headersReceived.complete(null); this.readCompleted.complete(null); this.drainAllWaiting(); } @@ -211,6 +240,9 @@ public final class ResultSet implements Iterable<Result> { */ public void markError(final Throwable throwable) { error.set(throwable); + // an error may arrive before headers were ever signaled (e.g. a write failure or a non-streaming error + // response); fail the headers future too so anyone blocked on it observes the error rather than hanging. + this.headersReceived.completeExceptionally(throwable); this.readCompleted.completeExceptionally(throwable); this.drainAllWaiting(); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 7aca5a2e2c..b7bce47f64 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java @@ -95,6 +95,16 @@ public class HttpStreamingResponseHandler extends MessageToMessageDecoder<HttpOb contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); queueInputStream = new ByteBufQueueInputStream(); + // Signal that the server's response headers have arrived, before any body chunk is processed. The full + // round trip to the server has completed, so a caller blocked on headersReceivedAsync() (e.g. a remote + // transaction's submit) can now proceed knowing the server has ordered this request ahead of any later one + // on the same transaction, without waiting for the body to stream back. An error response still trips this + // via markComplete()/markError() downstream. + { + final ResultSet rsForHeaders = pendingResultSet.get(); + if (rsForHeaders != null) rsForHeaders.markHeadersReceived(); + } + // Spawn reader thread for GraphBinary responses if (isGraphBinaryResponse()) { final ResultSet rs = pendingResultSet.get(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java index a245e36b03..5b12b84e1f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java @@ -320,7 +320,26 @@ public class HttpRemoteTransaction implements RemoteTransaction { } try { - return pinnedClient.submit(gremlin, builder.create()); + final ResultSet rs = pinnedClient.submit(gremlin, builder.create()); + + // pinnedClient.submit() returns as soon as the request is written to the wire - it does NOT wait for the + // server. That is unsafe for a transaction: a request is fire-and-forget at this point, so a later request + // on this transaction (e.g. a commit following an addV) could reach the server first and the earlier + // request's work would be lost. Block until the server's response headers come back, which means the + // request has made a full round trip and the server has ordered it on the transaction's single-threaded + // executor ahead of anything we send next. We deliberately wait on headers, not the full body, so large + // result sets still stream lazily to the caller via the returned ResultSet. + // + // This wait is intentionally unbounded: we must NOT abandon it on a client-side stopwatch. A slow server + // may still be going to run the request, so giving up early and letting the caller retry the same traversal + // could execute it twice. The wait ends only on a definitive signal - headers arriving (success) or the + // future completing exceptionally because the request can no longer succeed: a connection close + // (channelInactive -> markError), a transport read timeout (ReadTimeoutHandler -> exceptionCaught -> + // markError), or a server error response. The connection's own read timeout, not this method, bounds how + // long we can wait for an unresponsive server. + rs.headersReceivedAsync().get(); + + return rs; } catch (Exception e) { throw new RuntimeException("Transaction request failed: " + e.getMessage(), e); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java index 4bf3b8c6b7..707167a29b 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -69,6 +70,90 @@ public class ResultSetTest extends AbstractResultSetTest { assertThat(resultSet.allItemsAvailable(), is(true)); } + @Test + public void shouldCompleteHeadersReceivedOnMarkHeadersReceived() throws Exception { + final CompletableFuture<Void> headers = resultSet.headersReceivedAsync(); + assertThat(headers.isDone(), is(false)); + + resultSet.markHeadersReceived(); + + headers.get(2, TimeUnit.SECONDS); + assertThat(headers.isDone(), is(true)); + assertThat(headers.isCompletedExceptionally(), is(false)); + } + + @Test + public void shouldCompleteHeadersReceivedBeforeReadCompleted() throws Exception { + // models the streaming path: headers decode first, then body chunks stream in, then read completes. A caller + // blocked on headersReceivedAsync() must be released strictly before the full body is available. + final CompletableFuture<Void> headers = resultSet.headersReceivedAsync(); + final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); + assertThat(headers.isDone(), is(false)); + assertThat(all.isDone(), is(false)); + + // headers arrive first + resultSet.markHeadersReceived(); + headers.get(2, TimeUnit.SECONDS); + assertThat(headers.isDone(), is(true)); + // the body is not finished yet, so the "all" future is still outstanding + assertThat(all.isDone(), is(false)); + + // body finishes streaming + resultSet.add(new Result("test1")); + resultSet.markComplete(); + all.get(2, TimeUnit.SECONDS); + assertThat(all.isDone(), is(true)); + } + + @Test + public void shouldCompleteHeadersReceivedAsBackstopOnMarkComplete() throws Exception { + // models the non-streaming (aggregator) path which never signals headers explicitly: markComplete() must trip + // the headers future too, otherwise a caller blocked on it would hang on that path. + final CompletableFuture<Void> headers = resultSet.headersReceivedAsync(); + assertThat(headers.isDone(), is(false)); + + resultSet.markComplete(); + + headers.get(2, TimeUnit.SECONDS); + assertThat(headers.isDone(), is(true)); + assertThat(headers.isCompletedExceptionally(), is(false)); + } + + @Test + public void shouldCompleteHeadersReceivedExceptionallyOnErrorBeforeHeaders() throws Exception { + // an error can arrive before any header was signaled (e.g. a write failure or a non-streaming error response); + // the headers future must fail rather than hang so a caller blocked on it observes the error. + final CompletableFuture<Void> headers = resultSet.headersReceivedAsync(); + assertThat(headers.isDone(), is(false)); + + resultSet.markError(new RuntimeException("boom")); + + try { + headers.get(2, TimeUnit.SECONDS); + fail("headersReceivedAsync() should have completed exceptionally"); + } catch (ExecutionException ex) { + assertThat(ex.getCause().getMessage(), is("boom")); + } + assertThat(headers.isDone(), is(true)); + assertThat(headers.isCompletedExceptionally(), is(true)); + } + + @Test + public void shouldKeepHeadersReceivedSuccessfulWhenErrorArrivesAfterHeaders() throws Exception { + // headers come back OK and only then does the body fail (e.g. a server-side error in the body footer). The + // headers future already completed successfully and must not be flipped to a failure by the later error. + final CompletableFuture<Void> headers = resultSet.headersReceivedAsync(); + resultSet.markHeadersReceived(); + headers.get(2, TimeUnit.SECONDS); + assertThat(headers.isCompletedExceptionally(), is(false)); + + resultSet.markError(new RuntimeException("late boom")); + + // still successfully completed - the late error does not retroactively fail the headers future + assertThat(headers.isDone(), is(true)); + assertThat(headers.isCompletedExceptionally(), is(false)); + } + @Test public void shouldHaveAllItemsAvailableAsynchronouslyOnReadCompleteWhileLoading() throws Exception { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java index 79d26dd75c..3c06e24f83 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java @@ -41,9 +41,11 @@ import org.junit.Before; import org.junit.Test; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; @@ -103,6 +105,66 @@ public class HttpStreamingResponseHandlerTest { channel.finishAndReleaseAll(); } + @Test + public void shouldMarkHeadersReceivedWhenResponseHeaderDecodes() throws Exception { + // The ResultSet completes its async stages on its own executor, separate from the handler's readerPool (which + // mirrors production: cluster.executor() vs cluster.streamingReaderPool()). This matters here because an OK + // response spawns a reader thread that blocks the readerPool until end-of-stream; sharing one executor would + // deadlock the headers future's completion hop. + final ExecutorService rsExecutor = Executors.newSingleThreadExecutor(); + try { + final ResultSet rs = new ResultSet(rsExecutor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending); + + // capture the future once - each call to headersReceivedAsync() returns a fresh async stage + final CompletableFuture<Void> headers = rs.headersReceivedAsync(); + assertFalse(headers.isDone()); + + // the response header alone (no body yet) should trip the headers-received future, since the full round + // trip to the server has completed and the server has begun responding + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, SerTokens.MIME_GRAPHBINARY_V4); + channel.writeInbound(response); + + headers.get(2, TimeUnit.SECONDS); + assertTrue(headers.isDone()); + assertFalse(headers.isCompletedExceptionally()); + // the body has not been read yet, so read-completion must still be outstanding + assertFalse(rs.allItemsAvailable()); + + channel.finishAndReleaseAll(); + } finally { + rsExecutor.shutdownNow(); + } + } + + @Test + public void shouldMarkHeadersReceivedOnErrorResponseHeader() throws Exception { + final ExecutorService rsExecutor = Executors.newSingleThreadExecutor(); + try { + final ResultSet rs = new ResultSet(rsExecutor, RequestMessage.build("g.V()").create(), null); + final AtomicReference<ResultSet> pending = new AtomicReference<>(rs); + final EmbeddedChannel channel = createChannel(pending); + + final CompletableFuture<Void> headers = rs.headersReceivedAsync(); + assertFalse(headers.isDone()); + + // an error status still arrives as a response header first; a caller blocked on headers must be released + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); + channel.writeInbound(response); + + headers.get(2, TimeUnit.SECONDS); + assertTrue(headers.isDone()); + assertFalse(headers.isCompletedExceptionally()); + + channel.finishAndReleaseAll(); + } finally { + rsExecutor.shutdownNow(); + } + } + @Test public void shouldHandleDoubleLastHttpContentWithoutError() throws Exception { final ResultSet rs = new ResultSet(executor, RequestMessage.build("g.V()").create(), null);
