This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-http-streaming in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 19c0c8b05bd2b0cb0af18fa9a3dd3902c981901e Author: Yang Xia <[email protected]> AuthorDate: Thu Jan 15 21:07:59 2026 -0800 added interceptors, go http pooling & removed unused settings --- gremlin-go/driver/client.go | 34 ++---- gremlin-go/driver/connection.go | 4 - gremlin-go/driver/connection_test.go | 42 +++++++- gremlin-go/driver/driverRemoteConnection.go | 24 ++--- gremlin-go/driver/httpConnection.go | 119 ++++++++++++++++++--- gremlin-go/driver/logger.go | 1 - gremlin-go/driver/performance/performanceSuite.go | 3 - gremlin-go/driver/resources/error-messages/en.json | 1 + 8 files changed, 161 insertions(+), 67 deletions(-) diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index afc8924ce0..68277fdf7f 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -21,7 +21,6 @@ package gremlingo import ( "crypto/tls" - "fmt" "reflect" "runtime" "time" @@ -29,17 +28,8 @@ import ( "golang.org/x/text/language" ) -const keepAliveIntervalDefault = 5 * time.Second -const writeDeadlineDefault = 3 * time.Second const connectionTimeoutDefault = 5 * time.Second -// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The default is 64KB. -// If a buffer size is set zero, then the transporter default size is used. The I/O buffer -// sizes do not limit the size of the messages that can be sent or received. -const readBufferSizeDefault = 65536 // 64KB -const maxReadBufferSize = 1073741824 // 1GB - Go's maximum per Read() call -const writeBufferSizeDefault = 65536 // 64KB - // ClientSettings is used to modify a Client's settings on initialization. type ClientSettings struct { TraversalSource string @@ -48,16 +38,15 @@ type ClientSettings struct { Language language.Tag AuthInfo AuthInfoProvider TlsConfig *tls.Config - KeepAliveInterval time.Duration - WriteDeadline time.Duration ConnectionTimeout time.Duration EnableCompression bool - ReadBufferSize int - WriteBufferSize int // Maximum number of concurrent connections. Default: number of runtime processors MaximumConcurrentConnections int EnableUserAgentOnConnect bool + + // RequestInterceptors are functions that modify HTTP requests before sending. + RequestInterceptors []RequestInterceptor } // protocol defines the interface for HTTP communication with Gremlin server @@ -85,13 +74,9 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C Language: language.English, AuthInfo: &AuthInfo{}, TlsConfig: &tls.Config{}, - KeepAliveInterval: keepAliveIntervalDefault, - WriteDeadline: writeDeadlineDefault, ConnectionTimeout: connectionTimeoutDefault, EnableCompression: false, EnableUserAgentOnConnect: true, - ReadBufferSize: readBufferSizeDefault, - WriteBufferSize: writeBufferSizeDefault, MaximumConcurrentConnections: runtime.NumCPU(), } @@ -99,19 +84,11 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C configuration(settings) } - if settings.ReadBufferSize > maxReadBufferSize { - return nil, fmt.Errorf("readBufferSize %d exceeds maximum of %d bytes", settings.ReadBufferSize, maxReadBufferSize) - } - connSettings := &connectionSettings{ authInfo: settings.AuthInfo, tlsConfig: settings.TlsConfig, - keepAliveInterval: settings.KeepAliveInterval, - writeDeadline: settings.WriteDeadline, connectionTimeout: settings.ConnectionTimeout, enableCompression: settings.EnableCompression, - readBufferSize: settings.ReadBufferSize, - writeBufferSize: settings.WriteBufferSize, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, } @@ -119,6 +96,11 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C conn := newHttpConnection(logHandler, url, connSettings) + // Add user-provided interceptors + for _, interceptor := range settings.RequestInterceptors { + conn.AddInterceptor(interceptor) + } + client := &Client{ url: url, traversalSource: settings.TraversalSource, diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index 9dbb1ec8ea..8960ff43c4 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -27,11 +27,7 @@ import ( type connectionSettings struct { authInfo AuthInfoProvider tlsConfig *tls.Config - keepAliveInterval time.Duration - writeDeadline time.Duration connectionTimeout time.Duration enableCompression bool - readBufferSize int - writeBufferSize int enableUserAgentOnConnect bool } diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index b6ee029432..dcd56f8e40 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -58,13 +58,9 @@ func newDefaultConnectionSettings() *connectionSettings { return &connectionSettings{ authInfo: &AuthInfo{}, tlsConfig: &tls.Config{}, - keepAliveInterval: keepAliveIntervalDefault, - writeDeadline: writeDeadlineDefault, connectionTimeout: connectionTimeoutDefault, enableCompression: false, enableUserAgentOnConnect: true, - readBufferSize: readBufferSizeDefault, - writeBufferSize: writeBufferSizeDefault, } } @@ -275,6 +271,43 @@ func TestConnection(t *testing.T) { testBasicAuthAuthInfo := getBasicAuthInfo() testBasicAuthTlsConfig := &tls.Config{InsecureSkipVerify: true} + // this test is used to test the ws->http POC changes via manual execution with a local TP 4.0 gremlin server running on 8182 + t.Run("Test client.submit()", func(t *testing.T) { + skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) + + tlsConf := tls.Config{ + InsecureSkipVerify: true, + } + + client, err := NewClient(testNoAuthUrl, + //client, err := NewClient(noAuthSslUrl, + func(settings *ClientSettings) { + settings.TlsConfig = &tlsConf + settings.AuthInfo = testNoAuthAuthInfo + settings.EnableCompression = true + settings.TraversalSource = testServerModernGraphAlias + }) + assert.Nil(t, err) + assert.NotNil(t, client) + defer client.Close() + + // synchronous + for i := 0; i < 5; i++ { + submitCount(i, client, t) + } + + // async + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + submitCount(i, client, t) + }(i) + } + wg.Wait() + }) + t.Run("Test client.submit() with concurrency", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) @@ -282,7 +315,6 @@ func TestConnection(t *testing.T) { func(settings *ClientSettings) { settings.TlsConfig = testNoAuthTlsConfig settings.AuthInfo = testNoAuthAuthInfo - settings.WriteBufferSize = 1024 settings.EnableCompression = true settings.TraversalSource = testServerModernGraphAlias }) diff --git a/gremlin-go/driver/driverRemoteConnection.go b/gremlin-go/driver/driverRemoteConnection.go index 2d0c09a099..ccf726ff29 100644 --- a/gremlin-go/driver/driverRemoteConnection.go +++ b/gremlin-go/driver/driverRemoteConnection.go @@ -35,20 +35,15 @@ type DriverRemoteConnectionSettings struct { Language language.Tag AuthInfo AuthInfoProvider TlsConfig *tls.Config - KeepAliveInterval time.Duration - WriteDeadline time.Duration ConnectionTimeout time.Duration EnableCompression bool EnableUserAgentOnConnect bool - ReadBufferSize int - WriteBufferSize int - // Minimum amount of concurrent active traversals on a connection to trigger creation of a new connection - NewConnectionThreshold int // Maximum number of concurrent connections. Default: number of runtime processors MaximumConcurrentConnections int - // Initial amount of instantiated connections. Default: 1 - InitialConcurrentConnections int + + // RequestInterceptors are functions that modify HTTP requests before sending. + RequestInterceptors []RequestInterceptor } // DriverRemoteConnection is a remote connection. @@ -72,13 +67,9 @@ func NewDriverRemoteConnection( Language: language.English, AuthInfo: &AuthInfo{}, TlsConfig: &tls.Config{}, - KeepAliveInterval: keepAliveIntervalDefault, - WriteDeadline: writeDeadlineDefault, ConnectionTimeout: connectionTimeoutDefault, EnableCompression: false, EnableUserAgentOnConnect: true, - ReadBufferSize: readBufferSizeDefault, - WriteBufferSize: writeBufferSizeDefault, MaximumConcurrentConnections: runtime.NumCPU(), } @@ -89,12 +80,8 @@ func NewDriverRemoteConnection( connSettings := &connectionSettings{ authInfo: settings.AuthInfo, tlsConfig: settings.TlsConfig, - keepAliveInterval: settings.KeepAliveInterval, - writeDeadline: settings.WriteDeadline, connectionTimeout: settings.ConnectionTimeout, enableCompression: settings.EnableCompression, - readBufferSize: settings.ReadBufferSize, - writeBufferSize: settings.WriteBufferSize, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, } @@ -102,6 +89,11 @@ func NewDriverRemoteConnection( conn := newHttpConnection(logHandler, url, connSettings) + // Add user-provided interceptors + for _, interceptor := range settings.RequestInterceptors { + conn.AddInterceptor(interceptor) + } + client := &Client{ url: url, traversalSource: settings.TraversalSource, diff --git a/gremlin-go/driver/httpConnection.go b/gremlin-go/driver/httpConnection.go index 2a0d203744..5f4d74166b 100644 --- a/gremlin-go/driver/httpConnection.go +++ b/gremlin-go/driver/httpConnection.go @@ -22,11 +22,47 @@ package gremlingo import ( "bytes" "compress/zlib" + "encoding/base64" "io" + "net" "net/http" + "net/url" "time" ) +// Common HTTP header keys +const ( + HeaderContentType = "Content-Type" + HeaderAccept = "Accept" + HeaderUserAgent = "User-Agent" + HeaderAcceptEncoding = "Accept-Encoding" + HeaderAuthorization = "Authorization" +) + +// HttpRequest represents an HTTP request that can be modified by interceptors. +type HttpRequest struct { + Method string + URL *url.URL + Headers http.Header + Body []byte +} + +// NewHttpRequest creates a new HttpRequest with the given method and URL. +func NewHttpRequest(method, rawURL string) (*HttpRequest, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + return &HttpRequest{ + Method: method, + URL: u, + Headers: make(http.Header), + }, nil +} + +// RequestInterceptor is a function that modifies an HTTP request before it is sent. +type RequestInterceptor func(*HttpRequest) error + // httpConnection handles HTTP request/response for Gremlin queries type httpConnection struct { url string @@ -34,6 +70,7 @@ type httpConnection struct { connSettings *connectionSettings logHandler *logHandler serializer *GraphBinarySerializer + interceptors []RequestInterceptor } // Connection pool defaults aligned with Java driver @@ -45,12 +82,16 @@ const ( ) func newHttpConnection(handler *logHandler, url string, connSettings *connectionSettings) *httpConnection { - timeout := connSettings.connectionTimeout - if timeout == 0 { - timeout = defaultConnectionTimeout + connectionTimeout := connSettings.connectionTimeout + if connectionTimeout == 0 { + connectionTimeout = defaultConnectionTimeout } transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: connectionTimeout, // Connection setup timeout only + KeepAlive: 30 * time.Second, + }).DialContext, TLSClientConfig: connSettings.tlsConfig, MaxConnsPerHost: defaultMaxConnsPerHost, MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost, @@ -60,13 +101,18 @@ func newHttpConnection(handler *logHandler, url string, connSettings *connection return &httpConnection{ url: url, - httpClient: &http.Client{Transport: transport, Timeout: timeout}, + httpClient: &http.Client{Transport: transport}, // No Timeout - allows streaming connSettings: connSettings, logHandler: handler, serializer: newGraphBinarySerializer(handler), } } +// AddInterceptor adds a request interceptor to the chain. +func (c *httpConnection) AddInterceptor(interceptor RequestInterceptor) { + c.interceptors = append(c.interceptors, interceptor) +} + // send sends request and streams results directly to ResultSet func (c *httpConnection) send(req *request) (ResultSet, error) { rs := newChannelResultSet() @@ -85,14 +131,35 @@ func (c *httpConnection) send(req *request) (ResultSet, error) { func (c *httpConnection) executeAndStream(data []byte, rs ResultSet) { defer rs.Close() - req, err := http.NewRequest(http.MethodPost, c.url, bytes.NewReader(data)) + // Create HttpRequest for interceptors + httpReq, err := NewHttpRequest(http.MethodPost, c.url) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) rs.setError(err) return } + httpReq.Body = data + + // Set default headers before interceptors + c.setHttpRequestHeaders(httpReq) - c.setHeaders(req) + // Apply interceptors + for _, interceptor := range c.interceptors { + if err := interceptor(httpReq); err != nil { + c.logHandler.logf(Error, failedToSendRequest, err.Error()) + rs.setError(err) + return + } + } + + // Create actual http.Request from HttpRequest + req, err := http.NewRequest(httpReq.Method, httpReq.URL.String(), bytes.NewReader(httpReq.Body)) + if err != nil { + c.logHandler.logf(Error, failedToSendRequest, err.Error()) + rs.setError(err) + return + } + req.Header = httpReq.Headers resp, err := c.httpClient.Do(req) if err != nil { @@ -115,6 +182,38 @@ func (c *httpConnection) executeAndStream(data []byte, rs ResultSet) { c.streamToResultSet(reader, rs) } +// setHttpRequestHeaders sets default headers on HttpRequest (for interceptors) +func (c *httpConnection) setHttpRequestHeaders(req *HttpRequest) { + req.Headers.Set(HeaderContentType, graphBinaryMimeType) + req.Headers.Set(HeaderAccept, graphBinaryMimeType) + + if c.connSettings.enableUserAgentOnConnect { + req.Headers.Set(HeaderUserAgent, userAgent) + } + if c.connSettings.enableCompression { + req.Headers.Set(HeaderAcceptEncoding, "deflate") + } + if c.connSettings.authInfo != nil { + if headers := c.connSettings.authInfo.GetHeader(); headers != nil { + for k, vals := range headers { + for _, v := range vals { + req.Headers.Add(k, v) + } + } + } + if ok, user, pass := c.connSettings.authInfo.GetBasicAuth(); ok { + req.Headers.Set(HeaderAuthorization, "Basic "+basicAuth(user, pass)) + } + } +} + +// basicAuth encodes username and password for Basic auth header +func basicAuth(username, password string) string { + auth := username + ":" + password + return base64.StdEncoding.EncodeToString([]byte(auth)) +} + +// setHeaders sets headers on http.Request (legacy, kept for compatibility) func (c *httpConnection) setHeaders(req *http.Request) { req.Header.Set("Content-Type", graphBinaryMimeType) req.Header.Set("Accept", graphBinaryMimeType) @@ -171,18 +270,14 @@ func (c *httpConnection) streamToResultSet(reader io.Reader, rs ResultSet) { } if marker, ok := obj.(Marker); ok && marker == EndOfStream() { - code, msg, exc, err := d.readStatus() + code, msg, _, err := d.readStatus() if err != nil { c.logHandler.logf(Error, failedToReceiveResponse, err.Error()) rs.setError(err) return } if code != 200 && code != 0 { - if exc != "" { - rs.setError(newError(err0502ResponseHandlerReadLoopError, exc, code)) - } else { - rs.setError(newError(err0502ResponseHandlerReadLoopError, msg, code)) - } + rs.setError(newError(err0502ResponseHandlerReadLoopError, msg, code)) } return } diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go index f1f8c6986a..d198a224a9 100644 --- a/gremlin-go/driver/logger.go +++ b/gremlin-go/driver/logger.go @@ -109,7 +109,6 @@ const ( submitStartedString errorKey = "SUBMIT_STARTED_STRING" failedToCloseInErrorCallback errorKey = "FAILED_TO_CLOSE_IN_ERROR_CALLBACK" failedToWriteMessage errorKey = "FAILED_TO_WRITE_MESSAGE" - failedToSetWriteDeadline errorKey = "FAILED_TO_SET_WRITE_DEADLINE" failedToReceiveResponse errorKey = "FAILED_TO_RECEIVE_RESPONSE" failedToSendRequest errorKey = "FAILED_TO_SEND_REQUEST" logErrorGeneric errorKey = "LOG_ERROR_GENERIC" diff --git a/gremlin-go/driver/performance/performanceSuite.go b/gremlin-go/driver/performance/performanceSuite.go index ba2b493b91..bcad844f67 100644 --- a/gremlin-go/driver/performance/performanceSuite.go +++ b/gremlin-go/driver/performance/performanceSuite.go @@ -378,10 +378,7 @@ func createConnection(host string, port, poolSize, buffersSize int) (*GraphTrave drc, err = gremlingo.NewDriverRemoteConnection(endpoint, func(settings *DriverRemoteConnectionSettings) { settings.LogVerbosity = GremlinWarning settings.TraversalSource = gratefulGraphAlias - settings.NewConnectionThreshold = threshold settings.MaximumConcurrentConnections = poolSize - settings.WriteBufferSize = buffersSize - settings.ReadBufferSize = buffersSize }) if err != nil { diff --git a/gremlin-go/driver/resources/error-messages/en.json b/gremlin-go/driver/resources/error-messages/en.json index cfd070af47..0120b5b77f 100644 --- a/gremlin-go/driver/resources/error-messages/en.json +++ b/gremlin-go/driver/resources/error-messages/en.json @@ -23,6 +23,7 @@ "E0501_PROTOCOL_RESPONSEHANDLER_NO_RESULTSET_ON_DATA_RECEIVE":"E0501: resultSet was not created before data was received", "E0502_PROTOCOL_RESPONSEHANDLER_ERROR": "E0502: error handling response, error message '%+v'. statusCode: %d", + "E0502_PROTOCOL_RESPONSEHANDLER_READ_LOOP_ERROR": "E0502: error in read loop, error message '%v'. statusCode: %d", "E0503_PROTOCOL_RESPONSEHANDLER_AUTH_ERROR":"E0503: failed to authenticate %v : %v", "E0601_RESULT_NOT_VERTEX_ERROR":"E0601: result is not a Vertex",
