This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch go-http-streaming
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit efda6ee1bbaa92dcb6f7f279979c193ff1d62aca
Author: Yang Xia <[email protected]>
AuthorDate: Thu Jan 15 21:07:59 2026 -0800

    added interceptors, go http pooling & removed unused settings
---
 gremlin-go/driver/client.go                        |  34 ++----
 gremlin-go/driver/connection.go                    |   4 -
 gremlin-go/driver/connection_test.go               |   6 --
 gremlin-go/driver/driverRemoteConnection.go        |  24 ++---
 gremlin-go/driver/httpConnection.go                | 119 ++++++++++++++++++---
 gremlin-go/driver/logger.go                        |   1 -
 gremlin-go/driver/performance/performanceSuite.go  |   3 -
 gremlin-go/driver/resources/error-messages/en.json |   1 +
 8 files changed, 124 insertions(+), 68 deletions(-)

diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go
index dbe31bb8b5..902761112c 100644
--- a/gremlin-go/driver/client.go
+++ b/gremlin-go/driver/client.go
@@ -21,7 +21,6 @@ package gremlingo
 
 import (
        "crypto/tls"
-       "fmt"
        "reflect"
        "runtime"
        "time"
@@ -29,17 +28,8 @@ import (
        "golang.org/x/text/language"
 )
 
-const keepAliveIntervalDefault = 5 * time.Second
-const writeDeadlineDefault = 3 * time.Second
 const connectionTimeoutDefault = 5 * time.Second
 
-// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The 
default is 64KB.
-// If a buffer size is set zero, then the transporter default size is used. 
The I/O buffer
-// sizes do not limit the size of the messages that can be sent or received.
-const readBufferSizeDefault = 65536  // 64KB
-const maxReadBufferSize = 1073741824 // 1GB - Go's maximum per Read() call
-const writeBufferSizeDefault = 65536 // 64KB
-
 // ClientSettings is used to modify a Client's settings on initialization.
 type ClientSettings struct {
        TraversalSource   string
@@ -48,16 +38,15 @@ type ClientSettings struct {
        Language          language.Tag
        AuthInfo          AuthInfoProvider
        TlsConfig         *tls.Config
-       KeepAliveInterval time.Duration
-       WriteDeadline     time.Duration
        ConnectionTimeout time.Duration
        EnableCompression bool
-       ReadBufferSize    int
-       WriteBufferSize   int
 
        // Maximum number of concurrent connections. Default: number of runtime 
processors
        MaximumConcurrentConnections int
        EnableUserAgentOnConnect     bool
+
+       // RequestInterceptors are functions that modify HTTP requests before 
sending.
+       RequestInterceptors []RequestInterceptor
 }
 
 // protocol defines the interface for HTTP communication with Gremlin server
@@ -85,13 +74,9 @@ func NewClient(url string, configurations ...func(settings 
*ClientSettings)) (*C
                Language:                 language.English,
                AuthInfo:                 &AuthInfo{},
                TlsConfig:                &tls.Config{},
-               KeepAliveInterval:        keepAliveIntervalDefault,
-               WriteDeadline:            writeDeadlineDefault,
                ConnectionTimeout:        connectionTimeoutDefault,
                EnableCompression:        false,
                EnableUserAgentOnConnect: true,
-               ReadBufferSize:           readBufferSizeDefault,
-               WriteBufferSize:          writeBufferSizeDefault,
 
                MaximumConcurrentConnections: runtime.NumCPU(),
        }
@@ -99,19 +84,11 @@ func NewClient(url string, configurations ...func(settings 
*ClientSettings)) (*C
                configuration(settings)
        }
 
-       if settings.ReadBufferSize > maxReadBufferSize {
-               return nil, fmt.Errorf("readBufferSize %d exceeds maximum of %d 
bytes", settings.ReadBufferSize, maxReadBufferSize)
-       }
-
        connSettings := &connectionSettings{
                authInfo:                 settings.AuthInfo,
                tlsConfig:                settings.TlsConfig,
-               keepAliveInterval:        settings.KeepAliveInterval,
-               writeDeadline:            settings.WriteDeadline,
                connectionTimeout:        settings.ConnectionTimeout,
                enableCompression:        settings.EnableCompression,
-               readBufferSize:           settings.ReadBufferSize,
-               writeBufferSize:          settings.WriteBufferSize,
                enableUserAgentOnConnect: settings.EnableUserAgentOnConnect,
        }
 
@@ -119,6 +96,11 @@ func NewClient(url string, configurations ...func(settings 
*ClientSettings)) (*C
 
        conn := newHttpConnection(logHandler, url, connSettings)
 
+       // Add user-provided interceptors
+       for _, interceptor := range settings.RequestInterceptors {
+               conn.AddInterceptor(interceptor)
+       }
+
        client := &Client{
                url:                url,
                traversalSource:    settings.TraversalSource,
diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 9dbb1ec8ea..8960ff43c4 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -27,11 +27,7 @@ import (
 type connectionSettings struct {
        authInfo                 AuthInfoProvider
        tlsConfig                *tls.Config
-       keepAliveInterval        time.Duration
-       writeDeadline            time.Duration
        connectionTimeout        time.Duration
        enableCompression        bool
-       readBufferSize           int
-       writeBufferSize          int
        enableUserAgentOnConnect bool
 }
diff --git a/gremlin-go/driver/connection_test.go 
b/gremlin-go/driver/connection_test.go
index 14d4073c33..c329d0dd61 100644
--- a/gremlin-go/driver/connection_test.go
+++ b/gremlin-go/driver/connection_test.go
@@ -58,13 +58,9 @@ func newDefaultConnectionSettings() *connectionSettings {
        return &connectionSettings{
                authInfo:                 &AuthInfo{},
                tlsConfig:                &tls.Config{},
-               keepAliveInterval:        keepAliveIntervalDefault,
-               writeDeadline:            writeDeadlineDefault,
                connectionTimeout:        connectionTimeoutDefault,
                enableCompression:        false,
                enableUserAgentOnConnect: true,
-               readBufferSize:           readBufferSizeDefault,
-               writeBufferSize:          writeBufferSizeDefault,
        }
 }
 
@@ -288,7 +284,6 @@ func TestConnection(t *testing.T) {
                        func(settings *ClientSettings) {
                                settings.TlsConfig = &tlsConf
                                settings.AuthInfo = testNoAuthAuthInfo
-                               settings.WriteBufferSize = 1024
                                settings.EnableCompression = true
                                settings.TraversalSource = 
testServerModernGraphAlias
                        })
@@ -320,7 +315,6 @@ func TestConnection(t *testing.T) {
                        func(settings *ClientSettings) {
                                settings.TlsConfig = testNoAuthTlsConfig
                                settings.AuthInfo = testNoAuthAuthInfo
-                               settings.WriteBufferSize = 1024
                                settings.EnableCompression = true
                                settings.TraversalSource = 
testServerModernGraphAlias
                        })
diff --git a/gremlin-go/driver/driverRemoteConnection.go 
b/gremlin-go/driver/driverRemoteConnection.go
index 22160abfdc..78015e187b 100644
--- a/gremlin-go/driver/driverRemoteConnection.go
+++ b/gremlin-go/driver/driverRemoteConnection.go
@@ -35,20 +35,15 @@ type DriverRemoteConnectionSettings struct {
        Language                 language.Tag
        AuthInfo                 AuthInfoProvider
        TlsConfig                *tls.Config
-       KeepAliveInterval        time.Duration
-       WriteDeadline            time.Duration
        ConnectionTimeout        time.Duration
        EnableCompression        bool
        EnableUserAgentOnConnect bool
-       ReadBufferSize           int
-       WriteBufferSize          int
 
-       // Minimum amount of concurrent active traversals on a connection to 
trigger creation of a new connection
-       NewConnectionThreshold int
        // Maximum number of concurrent connections. Default: number of runtime 
processors
        MaximumConcurrentConnections int
-       // Initial amount of instantiated connections. Default: 1
-       InitialConcurrentConnections int
+
+       // RequestInterceptors are functions that modify HTTP requests before 
sending.
+       RequestInterceptors []RequestInterceptor
 }
 
 // DriverRemoteConnection is a remote connection.
@@ -72,13 +67,9 @@ func NewDriverRemoteConnection(
                Language:                 language.English,
                AuthInfo:                 &AuthInfo{},
                TlsConfig:                &tls.Config{},
-               KeepAliveInterval:        keepAliveIntervalDefault,
-               WriteDeadline:            writeDeadlineDefault,
                ConnectionTimeout:        connectionTimeoutDefault,
                EnableCompression:        false,
                EnableUserAgentOnConnect: true,
-               ReadBufferSize:           readBufferSizeDefault,
-               WriteBufferSize:          writeBufferSizeDefault,
 
                MaximumConcurrentConnections: runtime.NumCPU(),
        }
@@ -89,12 +80,8 @@ func NewDriverRemoteConnection(
        connSettings := &connectionSettings{
                authInfo:                 settings.AuthInfo,
                tlsConfig:                settings.TlsConfig,
-               keepAliveInterval:        settings.KeepAliveInterval,
-               writeDeadline:            settings.WriteDeadline,
                connectionTimeout:        settings.ConnectionTimeout,
                enableCompression:        settings.EnableCompression,
-               readBufferSize:           settings.ReadBufferSize,
-               writeBufferSize:          settings.WriteBufferSize,
                enableUserAgentOnConnect: settings.EnableUserAgentOnConnect,
        }
 
@@ -102,6 +89,11 @@ func NewDriverRemoteConnection(
 
        conn := newHttpConnection(logHandler, url, connSettings)
 
+       // Add user-provided interceptors
+       for _, interceptor := range settings.RequestInterceptors {
+               conn.AddInterceptor(interceptor)
+       }
+
        client := &Client{
                url:                url,
                traversalSource:    settings.TraversalSource,
diff --git a/gremlin-go/driver/httpConnection.go 
b/gremlin-go/driver/httpConnection.go
index f4a417bc44..32106f769e 100644
--- a/gremlin-go/driver/httpConnection.go
+++ b/gremlin-go/driver/httpConnection.go
@@ -22,11 +22,47 @@ package gremlingo
 import (
        "bytes"
        "compress/zlib"
+       "encoding/base64"
        "io"
+       "net"
        "net/http"
+       "net/url"
        "time"
 )
 
+// Common HTTP header keys
+const (
+       HeaderContentType    = "Content-Type"
+       HeaderAccept         = "Accept"
+       HeaderUserAgent      = "User-Agent"
+       HeaderAcceptEncoding = "Accept-Encoding"
+       HeaderAuthorization  = "Authorization"
+)
+
+// HttpRequest represents an HTTP request that can be modified by interceptors.
+type HttpRequest struct {
+       Method  string
+       URL     *url.URL
+       Headers http.Header
+       Body    []byte
+}
+
+// NewHttpRequest creates a new HttpRequest with the given method and URL.
+func NewHttpRequest(method, rawURL string) (*HttpRequest, error) {
+       u, err := url.Parse(rawURL)
+       if err != nil {
+               return nil, err
+       }
+       return &HttpRequest{
+               Method:  method,
+               URL:     u,
+               Headers: make(http.Header),
+       }, nil
+}
+
+// RequestInterceptor is a function that modifies an HTTP request before it is 
sent.
+type RequestInterceptor func(*HttpRequest) error
+
 // httpConnection handles HTTP request/response for Gremlin queries
 type httpConnection struct {
        url          string
@@ -34,6 +70,7 @@ type httpConnection struct {
        connSettings *connectionSettings
        logHandler   *logHandler
        serializer   *graphBinarySerializer
+       interceptors []RequestInterceptor
 }
 
 // Connection pool defaults aligned with Java driver
@@ -45,12 +82,16 @@ const (
 )
 
 func newHttpConnection(handler *logHandler, url string, connSettings 
*connectionSettings) *httpConnection {
-       timeout := connSettings.connectionTimeout
-       if timeout == 0 {
-               timeout = defaultConnectionTimeout
+       connectionTimeout := connSettings.connectionTimeout
+       if connectionTimeout == 0 {
+               connectionTimeout = defaultConnectionTimeout
        }
 
        transport := &http.Transport{
+               DialContext: (&net.Dialer{
+                       Timeout:   connectionTimeout, // Connection setup 
timeout only
+                       KeepAlive: 30 * time.Second,
+               }).DialContext,
                TLSClientConfig:     connSettings.tlsConfig,
                MaxConnsPerHost:     defaultMaxConnsPerHost,
                MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
@@ -60,13 +101,18 @@ func newHttpConnection(handler *logHandler, url string, 
connSettings *connection
 
        return &httpConnection{
                url:          url,
-               httpClient:   &http.Client{Transport: transport, Timeout: 
timeout},
+               httpClient:   &http.Client{Transport: transport}, // No Timeout 
- allows streaming
                connSettings: connSettings,
                logHandler:   handler,
                serializer:   newGraphBinarySerializer(handler),
        }
 }
 
+// AddInterceptor adds a request interceptor to the chain.
+func (c *httpConnection) AddInterceptor(interceptor RequestInterceptor) {
+       c.interceptors = append(c.interceptors, interceptor)
+}
+
 // send sends request and streams results directly to ResultSet
 func (c *httpConnection) send(req *request) (ResultSet, error) {
        rs := newChannelResultSet()
@@ -85,14 +131,35 @@ func (c *httpConnection) send(req *request) (ResultSet, 
error) {
 func (c *httpConnection) executeAndStream(data []byte, rs ResultSet) {
        defer rs.Close()
 
-       req, err := http.NewRequest(http.MethodPost, c.url, 
bytes.NewReader(data))
+       // Create HttpRequest for interceptors
+       httpReq, err := NewHttpRequest(http.MethodPost, c.url)
        if err != nil {
                c.logHandler.logf(Error, failedToSendRequest, err.Error())
                rs.setError(err)
                return
        }
+       httpReq.Body = data
+
+       // Set default headers before interceptors
+       c.setHttpRequestHeaders(httpReq)
 
-       c.setHeaders(req)
+       // Apply interceptors
+       for _, interceptor := range c.interceptors {
+               if err := interceptor(httpReq); err != nil {
+                       c.logHandler.logf(Error, failedToSendRequest, 
err.Error())
+                       rs.setError(err)
+                       return
+               }
+       }
+
+       // Create actual http.Request from HttpRequest
+       req, err := http.NewRequest(httpReq.Method, httpReq.URL.String(), 
bytes.NewReader(httpReq.Body))
+       if err != nil {
+               c.logHandler.logf(Error, failedToSendRequest, err.Error())
+               rs.setError(err)
+               return
+       }
+       req.Header = httpReq.Headers
 
        resp, err := c.httpClient.Do(req)
        if err != nil {
@@ -115,6 +182,38 @@ func (c *httpConnection) executeAndStream(data []byte, rs 
ResultSet) {
        c.streamToResultSet(reader, rs)
 }
 
+// setHttpRequestHeaders sets default headers on HttpRequest (for interceptors)
+func (c *httpConnection) setHttpRequestHeaders(req *HttpRequest) {
+       req.Headers.Set(HeaderContentType, graphBinaryMimeType)
+       req.Headers.Set(HeaderAccept, graphBinaryMimeType)
+
+       if c.connSettings.enableUserAgentOnConnect {
+               req.Headers.Set(HeaderUserAgent, userAgent)
+       }
+       if c.connSettings.enableCompression {
+               req.Headers.Set(HeaderAcceptEncoding, "deflate")
+       }
+       if c.connSettings.authInfo != nil {
+               if headers := c.connSettings.authInfo.GetHeader(); headers != 
nil {
+                       for k, vals := range headers {
+                               for _, v := range vals {
+                                       req.Headers.Add(k, v)
+                               }
+                       }
+               }
+               if ok, user, pass := c.connSettings.authInfo.GetBasicAuth(); ok 
{
+                       req.Headers.Set(HeaderAuthorization, "Basic 
"+basicAuth(user, pass))
+               }
+       }
+}
+
+// basicAuth encodes username and password for Basic auth header
+func basicAuth(username, password string) string {
+       auth := username + ":" + password
+       return base64.StdEncoding.EncodeToString([]byte(auth))
+}
+
+// setHeaders sets headers on http.Request (legacy, kept for compatibility)
 func (c *httpConnection) setHeaders(req *http.Request) {
        req.Header.Set("Content-Type", graphBinaryMimeType)
        req.Header.Set("Accept", graphBinaryMimeType)
@@ -171,18 +270,14 @@ func (c *httpConnection) streamToResultSet(reader 
io.Reader, rs ResultSet) {
                }
 
                if marker, ok := obj.(Marker); ok && marker == EndOfStream() {
-                       code, msg, exc, err := d.readStatus()
+                       code, msg, _, err := d.readStatus()
                        if err != nil {
                                c.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
                                rs.setError(err)
                                return
                        }
                        if code != 200 && code != 0 {
-                               if exc != "" {
-                                       
rs.setError(newError(err0502ResponseHandlerReadLoopError, exc, code))
-                               } else {
-                                       
rs.setError(newError(err0502ResponseHandlerReadLoopError, msg, code))
-                               }
+                               
rs.setError(newError(err0502ResponseHandlerReadLoopError, msg, code))
                        }
                        return
                }
diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go
index f1f8c6986a..d198a224a9 100644
--- a/gremlin-go/driver/logger.go
+++ b/gremlin-go/driver/logger.go
@@ -109,7 +109,6 @@ const (
        submitStartedString          errorKey = "SUBMIT_STARTED_STRING"
        failedToCloseInErrorCallback errorKey = 
"FAILED_TO_CLOSE_IN_ERROR_CALLBACK"
        failedToWriteMessage         errorKey = "FAILED_TO_WRITE_MESSAGE"
-       failedToSetWriteDeadline     errorKey = "FAILED_TO_SET_WRITE_DEADLINE"
        failedToReceiveResponse      errorKey = "FAILED_TO_RECEIVE_RESPONSE"
        failedToSendRequest          errorKey = "FAILED_TO_SEND_REQUEST"
        logErrorGeneric              errorKey = "LOG_ERROR_GENERIC"
diff --git a/gremlin-go/driver/performance/performanceSuite.go 
b/gremlin-go/driver/performance/performanceSuite.go
index ba2b493b91..bcad844f67 100644
--- a/gremlin-go/driver/performance/performanceSuite.go
+++ b/gremlin-go/driver/performance/performanceSuite.go
@@ -378,10 +378,7 @@ func createConnection(host string, port, poolSize, 
buffersSize int) (*GraphTrave
        drc, err = gremlingo.NewDriverRemoteConnection(endpoint, func(settings 
*DriverRemoteConnectionSettings) {
                settings.LogVerbosity = GremlinWarning
                settings.TraversalSource = gratefulGraphAlias
-               settings.NewConnectionThreshold = threshold
                settings.MaximumConcurrentConnections = poolSize
-               settings.WriteBufferSize = buffersSize
-               settings.ReadBufferSize = buffersSize
        })
 
        if err != nil {
diff --git a/gremlin-go/driver/resources/error-messages/en.json 
b/gremlin-go/driver/resources/error-messages/en.json
index cfd070af47..0120b5b77f 100644
--- a/gremlin-go/driver/resources/error-messages/en.json
+++ b/gremlin-go/driver/resources/error-messages/en.json
@@ -23,6 +23,7 @@
 
   "E0501_PROTOCOL_RESPONSEHANDLER_NO_RESULTSET_ON_DATA_RECEIVE":"E0501: 
resultSet was not created before data was received",
   "E0502_PROTOCOL_RESPONSEHANDLER_ERROR": "E0502: error handling response, 
error message '%+v'. statusCode: %d",
+  "E0502_PROTOCOL_RESPONSEHANDLER_READ_LOOP_ERROR": "E0502: error in read 
loop, error message '%v'. statusCode: %d",
   "E0503_PROTOCOL_RESPONSEHANDLER_AUTH_ERROR":"E0503: failed to authenticate 
%v : %v",
 
   "E0601_RESULT_NOT_VERTEX_ERROR":"E0601: result is not a Vertex",

Reply via email to