This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch go-http-streaming
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/go-http-streaming by this push:
     new 4366dbefeb clean up TODOs
4366dbefeb is described below

commit 4366dbefebe9302e3c3d0bbbda2483b6fea75612
Author: Yang Xia <[email protected]>
AuthorDate: Fri Jan 23 08:49:51 2026 -0800

    clean up TODOs
---
 gremlin-go/driver/auth.go                          |   5 +-
 gremlin-go/driver/client.go                        |   9 +-
 gremlin-go/driver/driverRemoteConnection.go        |   1 -
 gremlin-go/driver/gremlinlang.go                   |  14 -
 gremlin-go/driver/httpConnection.go                |  30 ++-
 gremlin-go/driver/logger.go                        |   2 +
 .../driver/resources/logger-messages/en.json       |   4 +-
 gremlin-go/driver/serializer.go                    |   6 -
 gremlin-go/driver/streamingDeserializer.go         | 117 +++++---
 gremlin-go/driver/streamingDeserializer_test.go    | 297 ++++++++++++++++++++-
 gremlin-go/driver/traversal.go                     |  30 ---
 11 files changed, 398 insertions(+), 117 deletions(-)

diff --git a/gremlin-go/driver/auth.go b/gremlin-go/driver/auth.go
index ab4a88cd89..237c8c2fb3 100644
--- a/gremlin-go/driver/auth.go
+++ b/gremlin-go/driver/auth.go
@@ -56,7 +56,10 @@ func Sigv4AuthWithCredentials(region, service string, 
credentialsProvider aws.Cr
                }
 
                signer := v4.NewSigner()
-               stdReq := req.ToStdRequest()
+               stdReq, err := req.ToStdRequest()
+               if err != nil {
+                       return err
+               }
                stdReq.Body = nil // Body is handled separately via payload hash
 
                if err := signer.SignHTTP(ctx, creds, stdReq, 
req.PayloadHash(), service, region, time.Now()); err != nil {
diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go
index 995a0259fc..e98f3321e1 100644
--- a/gremlin-go/driver/client.go
+++ b/gremlin-go/driver/client.go
@@ -112,8 +112,10 @@ func NewClient(url string, configurations ...func(settings 
*ClientSettings)) (*C
 // Close closes the client via connection.
 // This is idempotent due to the underlying close() methods being idempotent 
as well.
 func (client *Client) Close() {
-       // TODO check what needs to be closed
        client.logHandler.logf(Info, closeClient, client.url)
+       if client.protocol != nil {
+               client.protocol.close()
+       }
 }
 
 func (client *Client) errorCallback() {
@@ -124,9 +126,6 @@ func (client *Client) errorCallback() {
 func (client *Client) SubmitWithOptions(traversalString string, requestOptions 
RequestOptions) (ResultSet, error) {
        client.logHandler.logf(Debug, submitStartedString, traversalString)
        request := MakeStringRequest(traversalString, client.traversalSource, 
requestOptions)
-
-       // TODO interceptors (ie. auth)
-
        rs, err := client.protocol.send(&request)
        return rs, err
 }
@@ -143,10 +142,8 @@ func (client *Client) Submit(traversalString string, 
bindings ...map[string]inte
 }
 
 // submitGremlinLang submits GremlinLang to the server to execute and returns 
a ResultSet.
-// TODO test and update when connection is set up
 func (client *Client) submitGremlinLang(gremlinLang *GremlinLang) (ResultSet, 
error) {
        client.logHandler.logf(Debug, submitStartedString, *gremlinLang)
-       // TODO placeholder
        requestOptionsBuilder := new(RequestOptionsBuilder)
        if len(gremlinLang.GetParameters()) > 0 {
                requestOptionsBuilder.SetBindings(gremlinLang.GetParameters())
diff --git a/gremlin-go/driver/driverRemoteConnection.go 
b/gremlin-go/driver/driverRemoteConnection.go
index 0cf18bd729..93d0b42683 100644
--- a/gremlin-go/driver/driverRemoteConnection.go
+++ b/gremlin-go/driver/driverRemoteConnection.go
@@ -125,7 +125,6 @@ func (driver *DriverRemoteConnection) 
Submit(traversalString string) (ResultSet,
 }
 
 // submitGremlinLang sends a GremlinLang traversal to the server.
-// TODO test and update when connection is set up
 func (driver *DriverRemoteConnection) submitGremlinLang(gremlinLang 
*GremlinLang) (ResultSet, error) {
        if driver.isClosed {
                return nil, 
newError(err0203SubmitGremlinLangToClosedConnectionError)
diff --git a/gremlin-go/driver/gremlinlang.go b/gremlin-go/driver/gremlinlang.go
index 498cda8206..cd2b724487 100644
--- a/gremlin-go/driver/gremlinlang.go
+++ b/gremlin-go/driver/gremlinlang.go
@@ -567,17 +567,3 @@ func (gl *GremlinLang) convertArgument(arg interface{}) 
(interface{}, error) {
                }
        }
 }
-
-// TODO revisit and remove if necessary
-//var withOptionsMap map[any]string = map[any]string{
-//     WithOptions.Tokens:  "WithOptions.tokens",
-//     WithOptions.None:    "WithOptions.none",
-//     WithOptions.Ids:     "WithOptions.ids",
-//     WithOptions.Labels:  "WithOptions.labels",
-//     WithOptions.Keys:    "WithOptions.keys",
-//     WithOptions.Values:  "WithOptions.values",
-//     WithOptions.All:     "WithOptions.all",
-//     WithOptions.Indexer: "WithOptions.indexer",
-//     WithOptions.List:    "WithOptions.list",
-//     WithOptions.Map:     "WithOptions.map",
-//}
diff --git a/gremlin-go/driver/httpConnection.go 
b/gremlin-go/driver/httpConnection.go
index b96a6211ef..4f034e0e65 100644
--- a/gremlin-go/driver/httpConnection.go
+++ b/gremlin-go/driver/httpConnection.go
@@ -62,10 +62,14 @@ func NewHttpRequest(method, rawURL string) (*HttpRequest, 
error) {
 }
 
 // ToStdRequest converts HttpRequest to a standard http.Request for signing.
-func (r *HttpRequest) ToStdRequest() *http.Request {
-       req, _ := http.NewRequest(r.Method, r.URL.String(), 
bytes.NewReader(r.Body))
+// Returns nil if the request cannot be created (invalid method or URL).
+func (r *HttpRequest) ToStdRequest() (*http.Request, error) {
+       req, err := http.NewRequest(r.Method, r.URL.String(), 
bytes.NewReader(r.Body))
+       if err != nil {
+               return nil, err
+       }
        req.Header = r.Headers
-       return req
+       return req, nil
 }
 
 // PayloadHash returns the SHA256 hash of the request body for SigV4 signing.
@@ -184,7 +188,11 @@ func (c *httpConnection) executeAndStream(data []byte, rs 
ResultSet) {
                rs.setError(err)
                return
        }
-       defer resp.Body.Close()
+       defer func() {
+               if err := resp.Body.Close(); err != nil {
+                       c.logHandler.logf(Debug, failedToCloseResponseBody, 
err.Error())
+               }
+       }()
 
        reader, zlibReader, err := c.getReader(resp)
        if err != nil {
@@ -193,7 +201,11 @@ func (c *httpConnection) executeAndStream(data []byte, rs 
ResultSet) {
                return
        }
        if zlibReader != nil {
-               defer zlibReader.Close()
+               defer func() {
+                       if err := zlibReader.Close(); err != nil {
+                               c.logHandler.logf(Debug, 
failedToCloseDecompReader, err.Error())
+                       }
+               }()
        }
 
        c.streamToResultSet(reader, rs)
@@ -224,8 +236,8 @@ func (c *httpConnection) getReader(resp *http.Response) 
(io.Reader, io.Closer, e
 }
 
 func (c *httpConnection) streamToResultSet(reader io.Reader, rs ResultSet) {
-       d := newStreamingDeserializer(reader)
-       if err := d.readHeader(); err != nil {
+       d := NewStreamingDeserializer(reader)
+       if err := d.ReadHeader(); err != nil {
                if err != io.EOF {
                        c.logHandler.logf(Error, failedToReceiveResponse, 
err.Error())
                        rs.setError(err)
@@ -234,7 +246,7 @@ func (c *httpConnection) streamToResultSet(reader 
io.Reader, rs ResultSet) {
        }
 
        for {
-               obj, err := d.readFullyQualified()
+               obj, err := d.ReadFullyQualified()
                if err != nil {
                        if err != io.EOF {
                                c.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
@@ -244,7 +256,7 @@ func (c *httpConnection) streamToResultSet(reader 
io.Reader, rs ResultSet) {
                }
 
                if marker, ok := obj.(Marker); ok && marker == EndOfStream() {
-                       code, msg, _, err := d.readStatus()
+                       code, msg, _, err := d.ReadStatus()
                        if err != nil {
                                c.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
                                rs.setError(err)
diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go
index d198a224a9..388ce44a2c 100644
--- a/gremlin-go/driver/logger.go
+++ b/gremlin-go/driver/logger.go
@@ -114,4 +114,6 @@ const (
        logErrorGeneric              errorKey = "LOG_ERROR_GENERIC"
        closeDriverRemoteConnection  errorKey = "CLOSE_DRIVER_REMOTE_CONNECTION"
        closeClient                  errorKey = "CLOSE_CLIENT"
+       failedToCloseResponseBody    errorKey = "FAILED_TO_CLOSE_RESPONSE_BODY"
+       failedToCloseDecompReader    errorKey = 
"FAILED_TO_CLOSE_DECOMPRESSION_READER"
 )
diff --git a/gremlin-go/driver/resources/logger-messages/en.json 
b/gremlin-go/driver/resources/logger-messages/en.json
index 82bdce46a3..f7025e837a 100644
--- a/gremlin-go/driver/resources/logger-messages/en.json
+++ b/gremlin-go/driver/resources/logger-messages/en.json
@@ -20,5 +20,7 @@
   "POOL_NEW_CONNECTION_ERROR": "Falling back to least-used connection. 
Creating new connection due to least-used connection exceeding concurrent usage 
threshold failed: %s",
   "POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d 
exceeded MaximumConcurrentConnections setting %d - limiting 
InitialConcurrentConnections to %d.",
   "FAILED_TO_RECEIVE_RESPONSE": "Failed to receive response: %s",
-  "FAILED_TO_SEND_REQUEST": "Failed to send request: %s"
+  "FAILED_TO_SEND_REQUEST": "Failed to send request: %s",
+  "FAILED_TO_CLOSE_RESPONSE_BODY": "Error closing response body: %s",
+  "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader: 
%s"
 }
diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go
index c2d68c6500..98a5f3142c 100644
--- a/gremlin-go/driver/serializer.go
+++ b/gremlin-go/driver/serializer.go
@@ -149,26 +149,20 @@ func (gs *GraphBinarySerializer) 
DeserializeMessage(message []byte) (Response, e
 
        //Skip version and nullable byte.
        i := 2
-       // TODO temp serialization before fully streaming set-up
        for len(message) > 0 {
                n, err := readFullyQualifiedNullable(&message, &i, true)
                if err != nil {
                        return msg, err
                }
-               // TODO for debug, remove later
-               //_, _ = fmt.Fprintf(os.Stdout, "Deserializing data : %v\n", n)
                if n == EndOfStream() {
                        break
                }
                results = append(results, n)
        }
 
-       // TODO for debug, remove later
-       //_, _ = fmt.Fprintf(os.Stdout, "Deserialized results : %s\n", results)
        msg.ResponseResult.Data = results
        code := readUint32Safe(&message, &i)
        msg.ResponseStatus.code = code
-       // TODO read status message
        msg.ResponseStatus.message = "OK"
        statusMsg, err := readUnqualified(&message, &i, stringType, true)
        if err != nil {
diff --git a/gremlin-go/driver/streamingDeserializer.go 
b/gremlin-go/driver/streamingDeserializer.go
index e5541625f5..0c69460a1c 100644
--- a/gremlin-go/driver/streamingDeserializer.go
+++ b/gremlin-go/driver/streamingDeserializer.go
@@ -32,9 +32,31 @@ import (
        "github.com/google/uuid"
 )
 
-// streamingDeserializer reads GraphBinary directly from bufio.Reader
-// without interface overhead for maximum performance
-type streamingDeserializer struct {
+// StreamingDeserializer reads GraphBinary data directly from an io.Reader,
+// enabling streaming deserialization of server responses.
+//
+// Streaming Behavior:
+// The deserializer is designed to work with HTTP chunked transfer encoding 
where
+// the server streams results as they become available. Key behaviors:
+//
+//  1. Blocking on partial data: When reading an object, if the underlying 
reader
+//     doesn't have enough bytes available, the deserializer blocks until the 
data
+//     arrives. This is handled by io.ReadFull which waits for the exact 
number of
+//     bytes needed based on GraphBinary's self-describing format (type codes 
and
+//     length prefixes).
+//
+//  2. Chunk boundary independence: Go's HTTP client may receive data in 
chunks that
+//     don't align with server-sent GraphBinary object boundaries. The 
deserializer
+//     handles this transparently - it reads exactly the bytes needed for each 
object,
+//     blocking if necessary, regardless of how the data was chunked by the 
network.
+//
+//  3. Immediate object delivery: Each complete object is returned as soon as 
it's
+//     fully read, allowing the caller to process results incrementally rather 
than
+//     waiting for the entire response.
+//
+// The bufio.Reader wrapper provides efficient buffering without affecting the
+// streaming semantics - it simply reduces the number of underlying read 
syscalls.
+type StreamingDeserializer struct {
        r   *bufio.Reader
        buf [8]byte
        err error // sticky error
@@ -43,11 +65,13 @@ type streamingDeserializer struct {
 // GraphBinary flag for bulked list/set
 const flagBulked = 0x02
 
-func newStreamingDeserializer(r io.Reader) *streamingDeserializer {
-       return &streamingDeserializer{r: bufio.NewReaderSize(r, 8192)}
+// NewStreamingDeserializer creates a new StreamingDeserializer that reads 
from the given io.Reader.
+// The reader is wrapped in a buffered reader for efficient reading.
+func NewStreamingDeserializer(r io.Reader) *StreamingDeserializer {
+       return &StreamingDeserializer{r: bufio.NewReaderSize(r, 8192)}
 }
 
-func (d *streamingDeserializer) readByte() (byte, error) {
+func (d *StreamingDeserializer) readByte() (byte, error) {
        if d.err != nil {
                return 0, d.err
        }
@@ -59,7 +83,7 @@ func (d *streamingDeserializer) readByte() (byte, error) {
        return b, nil
 }
 
-func (d *streamingDeserializer) readBytes(n int) ([]byte, error) {
+func (d *StreamingDeserializer) readBytes(n int) ([]byte, error) {
        if d.err != nil {
                return nil, d.err
        }
@@ -72,7 +96,7 @@ func (d *streamingDeserializer) readBytes(n int) ([]byte, 
error) {
        return buf, nil
 }
 
-func (d *streamingDeserializer) readInt32() (int32, error) {
+func (d *StreamingDeserializer) readInt32() (int32, error) {
        if d.err != nil {
                return 0, d.err
        }
@@ -84,7 +108,7 @@ func (d *streamingDeserializer) readInt32() (int32, error) {
        return int32(binary.BigEndian.Uint32(d.buf[:4])), nil
 }
 
-func (d *streamingDeserializer) readUint32() (uint32, error) {
+func (d *StreamingDeserializer) readUint32() (uint32, error) {
        if d.err != nil {
                return 0, d.err
        }
@@ -96,7 +120,7 @@ func (d *streamingDeserializer) readUint32() (uint32, error) 
{
        return binary.BigEndian.Uint32(d.buf[:4]), nil
 }
 
-func (d *streamingDeserializer) readInt64() (int64, error) {
+func (d *StreamingDeserializer) readInt64() (int64, error) {
        if d.err != nil {
                return 0, d.err
        }
@@ -108,7 +132,9 @@ func (d *streamingDeserializer) readInt64() (int64, error) {
        return int64(binary.BigEndian.Uint64(d.buf[:8])), nil
 }
 
-func (d *streamingDeserializer) readHeader() error {
+// ReadHeader reads and validates the GraphBinary response header.
+// This must be called before reading any objects from the stream.
+func (d *StreamingDeserializer) ReadHeader() error {
        if _, err := d.readByte(); err != nil {
                return err
        }
@@ -116,7 +142,10 @@ func (d *streamingDeserializer) readHeader() error {
        return err
 }
 
-func (d *streamingDeserializer) readFullyQualified() (interface{}, error) {
+// ReadFullyQualified reads the next fully-qualified GraphBinary value from 
the stream.
+// Returns the deserialized object, or an error if reading fails.
+// When the end of the result stream is reached, this returns a Marker equal 
to EndOfStream().
+func (d *StreamingDeserializer) ReadFullyQualified() (interface{}, error) {
        dtByte, err := d.readByte()
        if err != nil {
                return nil, err
@@ -138,7 +167,7 @@ func (d *streamingDeserializer) readFullyQualified() 
(interface{}, error) {
        return d.readValue(dt, flag)
 }
 
-func (d *streamingDeserializer) readValue(dt dataType, flag byte) 
(interface{}, error) {
+func (d *StreamingDeserializer) readValue(dt dataType, flag byte) 
(interface{}, error) {
        switch dt {
        case intType:
                return d.readInt32()
@@ -231,7 +260,7 @@ func (d *streamingDeserializer) readValue(dt dataType, flag 
byte) (interface{},
        }
 }
 
-func (d *streamingDeserializer) readString() (string, error) {
+func (d *StreamingDeserializer) readString() (string, error) {
        length, err := d.readInt32()
        if err != nil {
                return "", err
@@ -246,14 +275,14 @@ func (d *streamingDeserializer) readString() (string, 
error) {
        return string(buf), nil
 }
 
-func (d *streamingDeserializer) readList(bulked bool) (interface{}, error) {
+func (d *StreamingDeserializer) readList(bulked bool) (interface{}, error) {
        length, err := d.readInt32()
        if err != nil {
                return nil, err
        }
        list := make([]interface{}, 0, length)
        for i := int32(0); i < length; i++ {
-               val, err := d.readFullyQualified()
+               val, err := d.ReadFullyQualified()
                if err != nil {
                        return nil, err
                }
@@ -272,18 +301,18 @@ func (d *streamingDeserializer) readList(bulked bool) 
(interface{}, error) {
        return list, nil
 }
 
-func (d *streamingDeserializer) readMap() (interface{}, error) {
+func (d *StreamingDeserializer) readMap() (interface{}, error) {
        length, err := d.readUint32()
        if err != nil {
                return nil, err
        }
        m := make(map[interface{}]interface{}, length)
        for i := uint32(0); i < length; i++ {
-               key, err := d.readFullyQualified()
+               key, err := d.ReadFullyQualified()
                if err != nil {
                        return nil, err
                }
-               val, err := d.readFullyQualified()
+               val, err := d.ReadFullyQualified()
                if err != nil {
                        return nil, err
                }
@@ -300,8 +329,8 @@ func (d *streamingDeserializer) readMap() (interface{}, 
error) {
        return m, nil
 }
 
-func (d *streamingDeserializer) readVertex(withProps bool) (*Vertex, error) {
-       id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readVertex(withProps bool) (*Vertex, error) {
+       id, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -319,7 +348,7 @@ func (d *streamingDeserializer) readVertex(withProps bool) 
(*Vertex, error) {
        }
        v := &Vertex{Element: Element{Id: id, Label: label}}
        if withProps {
-               props, err := d.readFullyQualified()
+               props, err := d.ReadFullyQualified()
                if err != nil {
                        return nil, err
                }
@@ -331,8 +360,8 @@ func (d *streamingDeserializer) readVertex(withProps bool) 
(*Vertex, error) {
        return v, nil
 }
 
-func (d *streamingDeserializer) readEdge() (*Edge, error) {
-       id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readEdge() (*Edge, error) {
+       id, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -359,7 +388,7 @@ func (d *streamingDeserializer) readEdge() (*Edge, error) {
        if _, err := d.readBytes(2); err != nil {
                return nil, err
        }
-       props, err := d.readFullyQualified()
+       props, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -375,12 +404,12 @@ func (d *streamingDeserializer) readEdge() (*Edge, error) 
{
        return e, nil
 }
 
-func (d *streamingDeserializer) readPath() (*Path, error) {
-       labels, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readPath() (*Path, error) {
+       labels, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
-       objects, err := d.readFullyQualified()
+       objects, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -405,12 +434,12 @@ func (d *streamingDeserializer) readPath() (*Path, error) 
{
        return path, nil
 }
 
-func (d *streamingDeserializer) readProperty() (*Property, error) {
+func (d *StreamingDeserializer) readProperty() (*Property, error) {
        key, err := d.readString()
        if err != nil {
                return nil, err
        }
-       value, err := d.readFullyQualified()
+       value, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -420,8 +449,8 @@ func (d *streamingDeserializer) readProperty() (*Property, 
error) {
        return &Property{Key: key, Value: value}, nil
 }
 
-func (d *streamingDeserializer) readVertexProperty() (*VertexProperty, error) {
-       id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readVertexProperty() (*VertexProperty, error) {
+       id, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -437,14 +466,14 @@ func (d *streamingDeserializer) readVertexProperty() 
(*VertexProperty, error) {
        if !ok {
                return nil, newError(err0404ReadNullTypeError)
        }
-       value, err := d.readFullyQualified()
+       value, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
        if _, err := d.readBytes(2); err != nil {
                return nil, err
        }
-       props, err := d.readFullyQualified()
+       props, err := d.ReadFullyQualified()
        if err != nil {
                return nil, err
        }
@@ -459,7 +488,7 @@ func (d *streamingDeserializer) readVertexProperty() 
(*VertexProperty, error) {
        return vp, nil
 }
 
-func (d *streamingDeserializer) readBigInt() (*big.Int, error) {
+func (d *StreamingDeserializer) readBigInt() (*big.Int, error) {
        length, err := d.readInt32()
        if err != nil {
                return nil, err
@@ -480,7 +509,7 @@ func (d *streamingDeserializer) readBigInt() (*big.Int, 
error) {
        return bi, nil
 }
 
-func (d *streamingDeserializer) readBigDecimal() (*BigDecimal, error) {
+func (d *StreamingDeserializer) readBigDecimal() (*BigDecimal, error) {
        scale, err := d.readInt32()
        if err != nil {
                return nil, err
@@ -492,7 +521,7 @@ func (d *streamingDeserializer) readBigDecimal() 
(*BigDecimal, error) {
        return &BigDecimal{Scale: scale, UnscaledValue: unscaled}, nil
 }
 
-func (d *streamingDeserializer) readDateTime() (time.Time, error) {
+func (d *StreamingDeserializer) readDateTime() (time.Time, error) {
        year, err := d.readInt32()
        if err != nil {
                return time.Time{}, err
@@ -522,7 +551,7 @@ func (d *streamingDeserializer) readDateTime() (time.Time, 
error) {
        return time.Date(int(year), time.Month(month), int(day), int(h), 
int(m), int(s), int(ns), GetTimezoneFromOffset(int(offset))), nil
 }
 
-func (d *streamingDeserializer) readDuration() (time.Duration, error) {
+func (d *StreamingDeserializer) readDuration() (time.Duration, error) {
        seconds, err := d.readInt64()
        if err != nil {
                return 0, err
@@ -534,7 +563,7 @@ func (d *streamingDeserializer) readDuration() 
(time.Duration, error) {
        return time.Duration(seconds*int64(time.Second) + int64(nanos)), nil
 }
 
-func (d *streamingDeserializer) readByteBuffer() (*ByteBuffer, error) {
+func (d *StreamingDeserializer) readByteBuffer() (*ByteBuffer, error) {
        length, err := d.readInt32()
        if err != nil {
                return nil, err
@@ -546,7 +575,7 @@ func (d *streamingDeserializer) readByteBuffer() 
(*ByteBuffer, error) {
        return &ByteBuffer{Data: data}, nil
 }
 
-func (d *streamingDeserializer) readEnum() (string, error) {
+func (d *StreamingDeserializer) readEnum() (string, error) {
        if _, err := d.readByte(); err != nil { // type code (string)
                return "", err
        }
@@ -556,12 +585,14 @@ func (d *streamingDeserializer) readEnum() (string, 
error) {
        return d.readString()
 }
 
-func (d *streamingDeserializer) readStatus() (uint32, string, string, error) {
-       code, err := d.readUint32()
+// ReadStatus reads the response status after the EndOfStream marker.
+// Returns the status code, message, exception string, and any error 
encountered.
+// This should be called after ReadFullyQualified() returns an EndOfStream 
marker.
+func (d *StreamingDeserializer) ReadStatus() (code uint32, message string, 
exception string, err error) {
+       code, err = d.readUint32()
        if err != nil {
                return 0, "", "", err
        }
-       var message, exception string
        flag, err := d.readByte()
        if err != nil {
                return code, "", "", err
diff --git a/gremlin-go/driver/streamingDeserializer_test.go 
b/gremlin-go/driver/streamingDeserializer_test.go
index 91ad173444..96cc7cdbe0 100644
--- a/gremlin-go/driver/streamingDeserializer_test.go
+++ b/gremlin-go/driver/streamingDeserializer_test.go
@@ -22,16 +22,60 @@ package gremlingo
 import (
        "bytes"
        "io"
+       "sync"
        "testing"
        "time"
 
        "github.com/stretchr/testify/assert"
 )
 
+// slowReader simulates a network stream that delivers data in chunks with 
delays.
+// This mimics how Go's HTTP client receives chunked transfer-encoded responses
+// where chunk boundaries don't align with GraphBinary object boundaries.
+type slowReader struct {
+       chunks [][]byte
+       delay  time.Duration
+       index  int
+       offset int
+       mu     sync.Mutex
+}
+
+func newSlowReader(chunks [][]byte, delay time.Duration) *slowReader {
+       return &slowReader{chunks: chunks, delay: delay}
+}
+
+func (r *slowReader) Read(p []byte) (n int, err error) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       if r.index >= len(r.chunks) {
+               return 0, io.EOF
+       }
+
+       // Simulate network delay between chunks
+       if r.offset == 0 && r.index > 0 {
+               r.mu.Unlock()
+               time.Sleep(r.delay)
+               r.mu.Lock()
+       }
+
+       chunk := r.chunks[r.index]
+       remaining := chunk[r.offset:]
+       n = copy(p, remaining)
+       r.offset += n
+
+       if r.offset >= len(chunk) {
+               r.index++
+               r.offset = 0
+       }
+
+       return n, nil
+}
+
 func TestStreamingDeserializer(t *testing.T) {
        t.Run("readInt32", func(t *testing.T) {
                data := []byte{0x00, 0x00, 0x00, 0x2A} // 42
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
                val, err := d.readInt32()
                assert.Nil(t, err)
                assert.Equal(t, int32(42), val)
@@ -39,7 +83,7 @@ func TestStreamingDeserializer(t *testing.T) {
 
        t.Run("readInt64", func(t *testing.T) {
                data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} 
// 100
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
                val, err := d.readInt64()
                assert.Nil(t, err)
                assert.Equal(t, int64(100), val)
@@ -47,7 +91,7 @@ func TestStreamingDeserializer(t *testing.T) {
 
        t.Run("readString", func(t *testing.T) {
                data := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o'}
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
                val, err := d.readString()
                assert.Nil(t, err)
                assert.Equal(t, "hello", val)
@@ -55,7 +99,7 @@ func TestStreamingDeserializer(t *testing.T) {
 
        t.Run("readString empty", func(t *testing.T) {
                data := []byte{0x00, 0x00, 0x00, 0x00}
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
                val, err := d.readString()
                assert.Nil(t, err)
                assert.Equal(t, "", val)
@@ -63,14 +107,14 @@ func TestStreamingDeserializer(t *testing.T) {
 
        t.Run("error on incomplete data", func(t *testing.T) {
                data := []byte{0x00, 0x00} // incomplete int32
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
                _, err := d.readInt32()
                assert.ErrorIs(t, err, io.ErrUnexpectedEOF)
        })
 
        t.Run("sticky error", func(t *testing.T) {
                data := []byte{0x00} // too short
-               d := newStreamingDeserializer(bytes.NewReader(data))
+               d := NewStreamingDeserializer(bytes.NewReader(data))
 
                _, err1 := d.readInt32()
                assert.Error(t, err1)
@@ -115,3 +159,244 @@ func TestStreamingChannelDelivery(t *testing.T) {
                }
        })
 }
+
+// TestStreamingBlocksOnPartialData verifies that the deserializer correctly 
blocks
+// when it receives partial data, waiting for the rest of the object to arrive.
+// This simulates the real-world scenario where Go's HTTP client receives 
chunks
+// that don't align with server-sent GraphBinary object boundaries.
+func TestStreamingBlocksOnPartialData(t *testing.T) {
+       t.Run("blocks until complete int32 is available", func(t *testing.T) {
+               // Split a 4-byte int32 across two chunks
+               chunk1 := []byte{0x00, 0x00} // First 2 bytes
+               chunk2 := []byte{0x00, 0x2A} // Last 2 bytes (total = 42)
+
+               reader := newSlowReader([][]byte{chunk1, chunk2}, 
20*time.Millisecond)
+               d := NewStreamingDeserializer(reader)
+
+               start := time.Now()
+               val, err := d.readInt32()
+               elapsed := time.Since(start)
+
+               assert.Nil(t, err)
+               assert.Equal(t, int32(42), val)
+               // Should have blocked waiting for second chunk
+               assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+                       "Should have blocked waiting for remaining bytes")
+       })
+
+       t.Run("blocks until complete string is available", func(t *testing.T) {
+               // String "hello" split across chunks:
+               // Chunk 1: length (4 bytes) + partial content
+               // Chunk 2: remaining content
+               chunk1 := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e'} // length=5, 
"he"
+               chunk2 := []byte{'l', 'l', 'o'}                    // "llo"
+
+               reader := newSlowReader([][]byte{chunk1, chunk2}, 
20*time.Millisecond)
+               d := NewStreamingDeserializer(reader)
+
+               start := time.Now()
+               val, err := d.readString()
+               elapsed := time.Since(start)
+
+               assert.Nil(t, err)
+               assert.Equal(t, "hello", val)
+               assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+                       "Should have blocked waiting for remaining string 
bytes")
+       })
+}
+
+// TestStreamingMultipleObjects verifies that multiple GraphBinary objects
+// can be read from a stream, with each object returned as soon as it's 
complete.
+func TestStreamingMultipleObjects(t *testing.T) {
+       t.Run("reads multiple objects as they arrive", func(t *testing.T) {
+               // Build a stream with 3 fully-qualified int32 values
+               // Each int32: type(1) + flag(1) + value(4) = 6 bytes
+               obj1 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x01} // int32 = 1
+               obj2 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x02} // int32 = 2
+               obj3 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x03} // int32 = 3
+
+               // Deliver each object as a separate chunk with delay
+               reader := newSlowReader([][]byte{obj1, obj2, obj3}, 
15*time.Millisecond)
+               d := NewStreamingDeserializer(reader)
+
+               var results []int32
+               var times []time.Duration
+               start := time.Now()
+
+               for i := 0; i < 3; i++ {
+                       val, err := d.ReadFullyQualified()
+                       assert.Nil(t, err)
+                       results = append(results, val.(int32))
+                       times = append(times, time.Since(start))
+               }
+
+               assert.Equal(t, []int32{1, 2, 3}, results)
+
+               // Objects should arrive with delays between them
+               for i := 1; i < len(times); i++ {
+                       gap := times[i] - times[i-1]
+                       assert.GreaterOrEqual(t, gap, 10*time.Millisecond,
+                               "Object %d should have arrived after a delay", 
i)
+               }
+       })
+
+       t.Run("handles object split across chunk boundary", func(t *testing.T) {
+               // First chunk: complete object + partial second object
+               // Second chunk: rest of second object + complete third object
+               chunk1 := []byte{
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // int32 = 1 
(complete)
+                       0x01, 0x00, 0x00, // partial int32 (type + flag + 2 
bytes of value)
+               }
+               chunk2 := []byte{
+                       0x00, 0x00, 0x02, // rest of int32 = 2
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x03, // int32 = 3 
(complete)
+               }
+
+               reader := newSlowReader([][]byte{chunk1, chunk2}, 
20*time.Millisecond)
+               d := NewStreamingDeserializer(reader)
+
+               // First object should return immediately
+               val1, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+               assert.Equal(t, int32(1), val1)
+
+               // Second object should block waiting for chunk2
+               start := time.Now()
+               val2, err := d.ReadFullyQualified()
+               elapsed := time.Since(start)
+               assert.Nil(t, err)
+               assert.Equal(t, int32(2), val2)
+               assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+                       "Should have blocked waiting for rest of object")
+
+               // Third object should return immediately (already in buffer)
+               val3, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+               assert.Equal(t, int32(3), val3)
+       })
+}
+
+// TestStreamingWithEndOfStreamMarker verifies that the deserializer correctly
+// handles the EndOfStream marker and subsequent status reading.
+func TestStreamingWithEndOfStreamMarker(t *testing.T) {
+       t.Run("reads EndOfStream marker and status", func(t *testing.T) {
+               // Build a complete response:
+               // - Header (2 bytes): version + flags
+               // - One int32 result
+               // - EndOfStream marker
+               // - Status: code(4) + message(nullable) + exception(nullable)
+               data := []byte{
+                       0x81, 0x00, // Header: version byte + no bulking
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x2A, // int32 = 42
+                       0xfd, 0x00, 0x00, // Marker type + flag + value=0 
(EndOfStream)
+                       0x00, 0x00, 0x00, 0xC8, // Status code = 200
+                       0x01, // Message is null
+                       0x01, // Exception is null
+               }
+
+               d := NewStreamingDeserializer(bytes.NewReader(data))
+
+               // Read header
+               err := d.ReadHeader()
+               assert.Nil(t, err)
+
+               // Read the result
+               val, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+               assert.Equal(t, int32(42), val)
+
+               // Read EndOfStream marker
+               marker, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+               assert.Equal(t, EndOfStream(), marker)
+
+               // Read status
+               code, msg, exc, err := d.ReadStatus()
+               assert.Nil(t, err)
+               assert.Equal(t, uint32(200), code)
+               assert.Equal(t, "", msg)
+               assert.Equal(t, "", exc)
+       })
+
+       t.Run("reads status with message", func(t *testing.T) {
+               // Status with a message
+               data := []byte{
+                       0xfd, 0x00, 0x00, // EndOfStream marker
+                       0x00, 0x00, 0x01, 0x90, // Status code = 400
+                       0x00,                                            // 
Message is not null
+                       0x00, 0x00, 0x00, 0x05, 'e', 'r', 'r', 'o', 'r', // 
Message = "error"
+                       0x01, // Exception is null
+               }
+
+               d := NewStreamingDeserializer(bytes.NewReader(data))
+
+               marker, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+               assert.Equal(t, EndOfStream(), marker)
+
+               code, msg, exc, err := d.ReadStatus()
+               assert.Nil(t, err)
+               assert.Equal(t, uint32(400), code)
+               assert.Equal(t, "error", msg)
+               assert.Equal(t, "", exc)
+       })
+}
+
+// TestStreamingComplexTypes verifies streaming deserialization of complex 
types
+// like vertices, edges, and paths.
+func TestStreamingComplexTypes(t *testing.T) {
+       t.Run("reads vertex from stream", func(t *testing.T) {
+               // Vertex: id(int32) + labels(list of string) + 
properties(nullable)
+               data := []byte{
+                       0x11, 0x00, // Vertex type + flag
+                       // ID: int32 = 1
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x01,
+                       // Labels: list with one string "person"
+                       0x00, 0x00, 0x00, 0x01, // list length = 1
+                       0x03, 0x00, // string type + flag
+                       0x00, 0x00, 0x00, 0x06, 'p', 'e', 'r', 's', 'o', 'n',
+                       // Properties: null
+                       0xfe, 0x01,
+               }
+
+               d := NewStreamingDeserializer(bytes.NewReader(data))
+               val, err := d.ReadFullyQualified()
+               assert.Nil(t, err)
+
+               v, ok := val.(*Vertex)
+               assert.True(t, ok)
+               assert.Equal(t, int32(1), v.Id)
+               assert.Equal(t, "person", v.Label)
+       })
+
+       t.Run("reads list of integers from chunked stream", func(t *testing.T) {
+               // List type split across chunks
+               chunk1 := []byte{
+                       0x09, 0x00, // List type + flag
+                       0x00, 0x00, 0x00, 0x03, // length = 3
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x0A, // int32 = 10
+               }
+               chunk2 := []byte{
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x14, // int32 = 20
+                       0x01, 0x00, 0x00, 0x00, 0x00, 0x1E, // int32 = 30
+               }
+
+               reader := newSlowReader([][]byte{chunk1, chunk2}, 
15*time.Millisecond)
+               d := NewStreamingDeserializer(reader)
+
+               start := time.Now()
+               val, err := d.ReadFullyQualified()
+               elapsed := time.Since(start)
+
+               assert.Nil(t, err)
+               list, ok := val.([]interface{})
+               assert.True(t, ok)
+               assert.Equal(t, 3, len(list))
+               assert.Equal(t, int32(10), list[0])
+               assert.Equal(t, int32(20), list[1])
+               assert.Equal(t, int32(30), list[2])
+
+               // Should have blocked for second chunk
+               assert.GreaterOrEqual(t, elapsed, 10*time.Millisecond)
+       })
+}
diff --git a/gremlin-go/driver/traversal.go b/gremlin-go/driver/traversal.go
index 3ac5e934f3..40e8d79785 100644
--- a/gremlin-go/driver/traversal.go
+++ b/gremlin-go/driver/traversal.go
@@ -44,7 +44,6 @@ func (t *Traversal) ToList() ([]*Result, error) {
                return nil, newError(err0901ToListAnonTraversalError)
        }
 
-       // TODO update and test when connection is set up
        results, err := t.remote.submitGremlinLang(t.GremlinLang)
        if err != nil {
                return nil, err
@@ -80,7 +79,6 @@ func (t *Traversal) Iterate() <-chan error {
 
                t.GremlinLang.AddStep("discard")
 
-               // TODO update and test when connection is set up
                res, err := t.remote.submitGremlinLang(t.GremlinLang)
                if err != nil {
                        r <- err
@@ -124,7 +122,6 @@ func (t *Traversal) Next() (*Result, error) {
 // GetResultSet submits the traversal and returns the ResultSet.
 func (t *Traversal) GetResultSet() (ResultSet, error) {
        if t.results == nil {
-               // TODO update and test when connection is set up
                results, err := t.remote.submitGremlinLang(t.GremlinLang)
                if err != nil {
                        return nil, err
@@ -749,33 +746,6 @@ var IO = ioconfig{
        Registry: "~tinkerpop.ioconfig.registry",
 }
 
-// TODO pending update/removal
-// Metrics holds metrics data; typically for .profile()-step analysis. Metrics 
may be nested. Nesting enables
-// the ability to capture explicit metrics for multiple distinct operations. 
Annotations are used to store
-// miscellaneous notes that might be useful to a developer when examining 
results, such as index coverage
-// for Steps in a Traversal.
-//type Metrics struct {
-//     Id   string
-//     Name string
-//     // the duration in nanoseconds.
-//     Duration      int64
-//     Counts        map[string]int64
-//     Annotations   map[string]interface{}
-//     NestedMetrics []Metrics
-//}
-
-// TraversalMetrics contains the Metrics gathered for a Traversal as the 
result of the .profile()-step.
-//type TraversalMetrics struct {
-//     // the duration in nanoseconds.
-//     Duration int64
-//     Metrics  []Metrics
-//}
-
-// GremlinType represents the GraphBinary type Class which can be used to 
serialize a class.
-//type GremlinType struct {
-//     Fqcn string
-//}
-
 // BigDecimal represents an arbitrary-precision signed decimal number, 
consisting of an arbitrary precision integer
 // unscaled value and a 32-bit integer scale.
 type BigDecimal struct {


Reply via email to