This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit ae07eb2d741262e76d33fb4270e17dadeaf36d07
Author: Ken Hu <[email protected]>
AuthorDate: Tue May 26 17:35:09 2026 -0700

    Update gremlin-go submit to block until response headers arrive
    
    Split executeAndStream() into sendRequest() (synchronous HTTP call) and
    streamResponse() (async body streaming). submit() now blocks until the
    server acknowledges the request (response headers received), then streams
    the body in the background.
    
    Non-GraphBinary HTTP errors (400/500 with text/JSON bodies) are now
    returned directly from submit() instead of being embedded in the
    ResultSet. Tests updated accordingly.
    
    Assisted-by: Kiro:claude-opus-4-6
---
 CHANGELOG.asciidoc                    |   1 +
 gremlin-go/driver/connection.go       | 113 +++++++++++++++++++---------------
 gremlin-go/driver/connection_test.go  |  55 ++++++++---------
 gremlin-go/driver/interceptor_test.go |  32 +++-------
 4 files changed, 97 insertions(+), 104 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index e418c5e4db..92c5776ab0 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -30,6 +30,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Added typed numeric wrappers and `preciseNumbers` connection option to 
`gremlin-javascript` for explicit control over numeric type serialization and 
deserialization.
 * Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result 
iteration, providing API parity with `next(n)` in the Java, Python, and .NET 
GLVs, and updated the Go translators in `gremlin-core` and `gremlin-javascript` 
to emit `NextN(n)` for the batched form.
 * Added Gremlator, a single page web application, that translates Gremlin into 
various programming languages like Javascript and Python.
+* Refactored Go driver connection to block until response headers arrive, 
enabling synchronous error returns and proper transaction ordering.
 * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in 
`globalThis.crypto.randomUUID()`.
 * Added streaming HTTP response support to `gremlin-driver` for incremental 
result deserialization over GraphBinary.
 * Connected HTTP streaming response deserialization to the traversal API in 
`gremlin-javascript`, enabling `next()` to return the first result without 
waiting for the full response.
diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 882161e36c..8354ec0574 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -119,28 +119,71 @@ func (c *connection) AddInterceptor(interceptor 
RequestInterceptor) {
        c.interceptors = append(c.interceptors, interceptor)
 }
 
-// submit sends request and streams results directly to ResultSet
+// submit sends request and streams results directly to ResultSet.
+// Blocks until response headers arrive, ensuring the server has acknowledged
+// receipt of the request before returning.
 func (c *connection) submit(req *RequestMessage) (ResultSet, error) {
        rs := newChannelResultSet()
 
+       // Send the HTTP request synchronously — blocks until response headers 
arrive
+       resp, err := c.sendRequest(req)
+       if err != nil {
+               rs.Close()
+               return nil, err
+       }
+
+       // If the HTTP status indicates an error and the response is not 
GraphBinary,
+       // read the body as a text/JSON error message instead of attempting 
binary
+       // deserialization which would produce cryptic errors.
+       contentType := resp.Header.Get(HeaderContentType)
+       if resp.StatusCode >= 400 && !strings.Contains(contentType, 
graphBinaryMimeType) {
+               defer func() {
+                       io.Copy(io.Discard, resp.Body)
+                       resp.Body.Close()
+               }()
+               bodyBytes, readErr := io.ReadAll(resp.Body)
+               if readErr != nil {
+                       rs.Close()
+                       return nil, fmt.Errorf("Gremlin Server returned HTTP %d 
and failed to read body: %w",
+                               resp.StatusCode, readErr)
+               }
+               errorBody := string(bodyBytes)
+               errorMsg := tryExtractJSONError(errorBody)
+               if errorMsg == "" {
+                       errorMsg = fmt.Sprintf("Gremlin Server returned HTTP 
%d: %s", resp.StatusCode, errorBody)
+               }
+               c.logHandler.logf(Error, failedToReceiveResponse, errorMsg)
+               rs.Close()
+               return nil, fmt.Errorf("%s", errorMsg)
+       }
+
+       // Stream the response body into the ResultSet asynchronously
        c.wg.Add(1)
        go func() {
                defer c.wg.Done()
-               c.executeAndStream(req, rs)
+               // Drain any unread bytes so the connection can be reused 
gracefully.
+               // Without this, Go's HTTP client sends a TCP RST instead of 
FIN,
+               // causing "Connection reset by peer" errors on the server.
+               defer func() {
+                       io.Copy(io.Discard, resp.Body)
+                       if err := resp.Body.Close(); err != nil {
+                               c.logHandler.logf(Debug, 
failedToCloseResponseBody, err.Error())
+                       }
+               }()
+               defer rs.Close()
+               c.streamResponse(resp, rs)
        }()
 
        return rs, nil
 }
 
-func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) {
-       defer rs.Close()
-
+// sendRequest builds and sends the HTTP request, blocking until response 
headers arrive.
+func (c *connection) sendRequest(req *RequestMessage) (*http.Response, error) {
        // Create HttpRequest for interceptors
        httpReq, err := NewHttpRequest(http.MethodPost, c.url)
        if err != nil {
                c.logHandler.logf(Error, failedToSendRequest, err.Error())
-               rs.setError(err)
-               return
+               return nil, err
        }
 
        // Set default headers before interceptors
@@ -154,8 +197,7 @@ func (c *connection) executeAndStream(req *RequestMessage, 
rs ResultSet) {
        for _, interceptor := range c.interceptors {
                if err := interceptor(httpReq); err != nil {
                        c.logHandler.logf(Error, failedToSendRequest, 
err.Error())
-                       rs.setError(err)
-                       return
+                       return nil, err
                }
        }
 
@@ -165,15 +207,13 @@ func (c *connection) executeAndStream(req 
*RequestMessage, rs ResultSet) {
                        data, err := c.serializer.SerializeMessage(r)
                        if err != nil {
                                c.logHandler.logf(Error, failedToSendRequest, 
err.Error())
-                               rs.setError(err)
-                               return
+                               return nil, err
                        }
                        httpReq.Body = data
                } else {
                        errMsg := "request body was not serialized; either 
provide a serializer or add an interceptor that serializes the request"
                        c.logHandler.logf(Error, failedToSendRequest, errMsg)
-                       rs.setError(fmt.Errorf("%s", errMsg))
-                       return
+                       return nil, fmt.Errorf("%s", errMsg)
                }
        }
 
@@ -184,16 +224,14 @@ func (c *connection) executeAndStream(req 
*RequestMessage, rs ResultSet) {
                httpGoReq, err = http.NewRequest(httpReq.Method, 
httpReq.URL.String(), bytes.NewReader(body))
                if err != nil {
                        c.logHandler.logf(Error, failedToSendRequest, 
err.Error())
-                       rs.setError(err)
-                       return
+                       return nil, err
                }
                httpGoReq.Header = httpReq.Headers
        case io.Reader:
                httpGoReq, err = http.NewRequest(httpReq.Method, 
httpReq.URL.String(), body)
                if err != nil {
                        c.logHandler.logf(Error, failedToSendRequest, 
err.Error())
-                       rs.setError(err)
-                       return
+                       return nil, err
                }
                httpGoReq.Header = httpReq.Headers
        case *http.Request:
@@ -201,48 +239,21 @@ func (c *connection) executeAndStream(req 
*RequestMessage, rs ResultSet) {
        default:
                errMsg := fmt.Sprintf("unsupported body type after 
interceptors: %T", body)
                c.logHandler.logf(Error, failedToSendRequest, errMsg)
-               rs.setError(fmt.Errorf("%s", errMsg))
-               return
+               return nil, fmt.Errorf("%s", errMsg)
        }
 
+       // This blocks until response headers arrive
        resp, err := c.httpClient.Do(httpGoReq)
        if err != nil {
                c.logHandler.logf(Error, failedToSendRequest, err.Error())
-               rs.setError(err)
-               return
+               return nil, err
        }
-       defer func() {
-               // Drain any unread bytes so the connection can be reused 
gracefully.
-               // Without this, Go's HTTP client sends a TCP RST instead of 
FIN,
-               // causing "Connection reset by peer" errors on the server.
-               io.Copy(io.Discard, resp.Body)
-               if err := resp.Body.Close(); err != nil {
-                       c.logHandler.logf(Debug, failedToCloseResponseBody, 
err.Error())
-               }
-       }()
 
-       // If the HTTP status indicates an error and the response is not 
GraphBinary,
-       // read the body as a text/JSON error message instead of attempting 
binary
-       // deserialization which would produce cryptic errors.
-       contentType := resp.Header.Get(HeaderContentType)
-       if resp.StatusCode >= 400 && !strings.Contains(contentType, 
graphBinaryMimeType) {
-               bodyBytes, readErr := io.ReadAll(resp.Body)
-               if readErr != nil {
-                       c.logHandler.logf(Error, failedToReceiveResponse, 
readErr.Error())
-                       rs.setError(fmt.Errorf("Gremlin Server returned HTTP %d 
and failed to read body: %w",
-                               resp.StatusCode, readErr))
-                       return
-               }
-               errorBody := string(bodyBytes)
-               errorMsg := tryExtractJSONError(errorBody)
-               if errorMsg == "" {
-                       errorMsg = fmt.Sprintf("Gremlin Server returned HTTP 
%d: %s", resp.StatusCode, errorBody)
-               }
-               c.logHandler.logf(Error, failedToReceiveResponse, errorMsg)
-               rs.setError(fmt.Errorf("%s", errorMsg))
-               return
-       }
+       return resp, nil
+}
 
+// streamResponse reads the response body and pushes results into the 
ResultSet.
+func (c *connection) streamResponse(resp *http.Response, rs ResultSet) {
        reader, zlibReader, err := c.getReader(resp)
        if err != nil {
                c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
diff --git a/gremlin-go/driver/connection_test.go 
b/gremlin-go/driver/connection_test.go
index aebdeb992f..1b4620a376 100644
--- a/gremlin-go/driver/connection_test.go
+++ b/gremlin-go/driver/connection_test.go
@@ -990,12 +990,8 @@ func TestConnectionWithMockServer(t *testing.T) {
                        connectionTimeout: 100 * time.Millisecond,
                })
 
-               rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", 
Fields: map[string]interface{}{}})
-               assert.NoError(t, err) // submit returns nil, error goes to 
ResultSet
-
-               // All() blocks until stream closes, then we can check error
-               _, _ = rs.All()
-               assert.Error(t, rs.GetError())
+               _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+               assert.Error(t, err) // connection errors are now returned 
directly
        })
 
        t.Run("receives headers from request", func(t *testing.T) {
@@ -1035,14 +1031,10 @@ func TestConnectionWithMockServer(t *testing.T) {
                defer server.Close()
 
                conn := newConnection(newTestLogHandler(), server.URL, 
&connectionSettings{})
-               rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", 
Fields: map[string]interface{}{}})
-               require.NoError(t, err)
-
-               _, _ = rs.All()
-               rsErr := rs.GetError()
-               require.Error(t, rsErr)
-               assert.Contains(t, rsErr.Error(), "HTTP 500")
-               assert.Contains(t, rsErr.Error(), "Internal Server Error")
+               _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "HTTP 500")
+               assert.Contains(t, err.Error(), "Internal Server Error")
        })
 
        t.Run("extracts message from JSON error response", func(t *testing.T) {
@@ -1054,13 +1046,9 @@ func TestConnectionWithMockServer(t *testing.T) {
                defer server.Close()
 
                conn := newConnection(newTestLogHandler(), server.URL, 
&connectionSettings{})
-               rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", 
Fields: map[string]interface{}{}})
-               require.NoError(t, err)
-
-               _, _ = rs.All()
-               rsErr := rs.GetError()
-               require.Error(t, rsErr)
-               assert.Equal(t, "Authentication required", rsErr.Error())
+               _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+               require.Error(t, err)
+               assert.Equal(t, "Authentication required", err.Error())
        })
 
        t.Run("falls back to raw body for non-JSON error response", func(t 
*testing.T) {
@@ -1072,14 +1060,10 @@ func TestConnectionWithMockServer(t *testing.T) {
                defer server.Close()
 
                conn := newConnection(newTestLogHandler(), server.URL, 
&connectionSettings{})
-               rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", 
Fields: map[string]interface{}{}})
-               require.NoError(t, err)
-
-               _, _ = rs.All()
-               rsErr := rs.GetError()
-               require.Error(t, rsErr)
-               assert.Contains(t, rsErr.Error(), "HTTP 502")
-               assert.Contains(t, rsErr.Error(), "<html>Bad Gateway</html>")
+               _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "HTTP 502")
+               assert.Contains(t, err.Error(), "<html>Bad Gateway</html>")
        })
 
        t.Run("falls through to GraphBinary deserialization for GraphBinary 
error responses", func(t *testing.T) {
@@ -1127,9 +1111,18 @@ func TestConnectionWithMockServer(t *testing.T) {
        })
 
        t.Run("close waits for in-flight requests", func(t *testing.T) {
+               // Server responds with headers immediately but streams body 
slowly.
+               // This tests that close() waits for the body-streaming 
goroutine.
                server := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
-                       time.Sleep(200 * time.Millisecond)
+                       w.Header().Set("Content-Type", graphBinaryMimeType)
                        w.WriteHeader(http.StatusOK)
+                       if f, ok := w.(http.Flusher); ok {
+                               f.Flush()
+                       }
+                       // Delay before writing body so the streaming goroutine 
is still active
+                       time.Sleep(200 * time.Millisecond)
+                       // Write a minimal valid GraphBinary response (version 
+ no-bulking flag)
+                       w.Write([]byte{0x84, 0x00})
                }))
                defer server.Close()
 
@@ -1142,7 +1135,7 @@ func TestConnectionWithMockServer(t *testing.T) {
                conn.close()
                elapsed := time.Since(start)
 
-               // close() should have waited for the in-flight goroutine
+               // close() should have waited for the body-streaming goroutine 
to finish
                assert.GreaterOrEqual(t, elapsed.Milliseconds(), int64(150),
                        "close() should wait for in-flight requests to 
complete")
 
diff --git a/gremlin-go/driver/interceptor_test.go 
b/gremlin-go/driver/interceptor_test.go
index 40a947ac9d..adb51786fc 100644
--- a/gremlin-go/driver/interceptor_test.go
+++ b/gremlin-go/driver/interceptor_test.go
@@ -246,13 +246,9 @@ func TestInterceptor_NilSerializerNoSerialization(t 
*testing.T) {
        conn := newConnection(newTestLogHandler(), server.URL, 
&connectionSettings{})
        conn.serializer = nil // explicitly nil serializer
 
-       rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
-       require.NoError(t, err)
-
-       _, _ = rs.All() // drain — this triggers the async executeAndStream
-       rsErr := rs.GetError()
-       require.Error(t, rsErr, "should get an error when serializer is nil and 
no interceptor serializes")
-       assert.Contains(t, rsErr.Error(), "request body was not serialized",
+       _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+       require.Error(t, err, "should get an error when serializer is nil and 
no interceptor serializes")
+       assert.Contains(t, err.Error(), "request body was not serialized",
                "error message should indicate the body was not serialized")
 }
 
@@ -327,14 +323,10 @@ func TestInterceptor_ErrorPropagation(t *testing.T) {
                return fmt.Errorf("interceptor failed")
        })
 
-       rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
-       require.NoError(t, err)
-
-       _, _ = rs.All() // drain — triggers async executeAndStream
-       rsErr := rs.GetError()
-       require.Error(t, rsErr, "interceptor error should propagate to 
ResultSet")
-       assert.Contains(t, rsErr.Error(), "interceptor failed",
-               "ResultSet error should contain the interceptor's error 
message")
+       _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+       require.Error(t, err, "interceptor error should propagate")
+       assert.Contains(t, err.Error(), "interceptor failed",
+               "error should contain the interceptor's error message")
 }
 
 // TestInterceptor_UnsupportedBodyType verifies that setting Body to an 
unsupported type
@@ -353,13 +345,9 @@ func TestInterceptor_UnsupportedBodyType(t *testing.T) {
                return nil
        })
 
-       rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
-       require.NoError(t, err)
-
-       _, _ = rs.All() // drain
-       rsErr := rs.GetError()
-       require.Error(t, rsErr, "unsupported body type should produce an error")
-       assert.Contains(t, rsErr.Error(), "unsupported body type",
+       _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: 
map[string]interface{}{}})
+       require.Error(t, err, "unsupported body type should produce an error")
+       assert.Contains(t, err.Error(), "unsupported body type",
                "error message should indicate unsupported body type")
 }
 

Reply via email to