This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-http-fix in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e8ac84d18445642f94b289628715c4a53206942b Author: Yang Xia <[email protected]> AuthorDate: Tue Mar 24 23:34:51 2026 -0700 added wg to ensure in-flight goroutines complete & added response body drain before close to prevent TCP RST errors --- gremlin-go/driver/connection.go | 13 ++++++++++++- gremlin-go/driver/connection_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index 5396b6ab43..54def5821a 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -29,6 +29,7 @@ import ( "net" "net/http" "strings" + "sync" "time" ) @@ -53,6 +54,7 @@ type connection struct { logHandler *logHandler serializer *GraphBinarySerializer interceptors []RequestInterceptor + wg sync.WaitGroup } // Connection pool defaults aligned with Java driver @@ -121,7 +123,11 @@ func (c *connection) AddInterceptor(interceptor RequestInterceptor) { func (c *connection) submit(req *RequestMessage) (ResultSet, error) { rs := newChannelResultSet() - go c.executeAndStream(req, rs) + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.executeAndStream(req, rs) + }() return rs, nil } @@ -206,6 +212,10 @@ func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { return } defer func() { + // Drain any unread bytes so the connection can be reused gracefully. + // Without this, Go's HTTP client sends a TCP RST instead of FIN, + // causing "Connection reset by peer" errors on the server. + io.Copy(io.Discard, resp.Body) if err := resp.Body.Close(); err != nil { c.logHandler.logf(Debug, failedToCloseResponseBody, err.Error()) } @@ -328,5 +338,6 @@ func tryExtractJSONError(body string) string { } func (c *connection) close() { + c.wg.Wait() c.httpClient.CloseIdleConnections() } diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index 980040be5d..7d95192211 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -1093,6 +1093,30 @@ func TestConnectionWithMockServer(t *testing.T) { assert.Equal(t, graphBinaryMimeType, interceptorHeaders.Get("Content-Type")) assert.Equal(t, graphBinaryMimeType, interceptorHeaders.Get("Accept")) }) + + t.Run("close waits for in-flight requests", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(200 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) + + rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.NoError(t, err) + + start := time.Now() + conn.close() + elapsed := time.Since(start) + + // close() should have waited for the in-flight goroutine + assert.GreaterOrEqual(t, elapsed.Milliseconds(), int64(150), + "close() should wait for in-flight requests to complete") + + // ResultSet should be closed (goroutine finished) + _, _ = rs.All() + }) } // Tests for connection pool configuration settings
