This is an automated email from the ASF dual-hosted git repository.
xiazcy pushed a commit to branch go-http-streaming
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/go-http-streaming by this push:
new 4366dbefeb clean up TODOs
4366dbefeb is described below
commit 4366dbefebe9302e3c3d0bbbda2483b6fea75612
Author: Yang Xia <[email protected]>
AuthorDate: Fri Jan 23 08:49:51 2026 -0800
clean up TODOs
---
gremlin-go/driver/auth.go | 5 +-
gremlin-go/driver/client.go | 9 +-
gremlin-go/driver/driverRemoteConnection.go | 1 -
gremlin-go/driver/gremlinlang.go | 14 -
gremlin-go/driver/httpConnection.go | 30 ++-
gremlin-go/driver/logger.go | 2 +
.../driver/resources/logger-messages/en.json | 4 +-
gremlin-go/driver/serializer.go | 6 -
gremlin-go/driver/streamingDeserializer.go | 117 +++++---
gremlin-go/driver/streamingDeserializer_test.go | 297 ++++++++++++++++++++-
gremlin-go/driver/traversal.go | 30 ---
11 files changed, 398 insertions(+), 117 deletions(-)
diff --git a/gremlin-go/driver/auth.go b/gremlin-go/driver/auth.go
index ab4a88cd89..237c8c2fb3 100644
--- a/gremlin-go/driver/auth.go
+++ b/gremlin-go/driver/auth.go
@@ -56,7 +56,10 @@ func Sigv4AuthWithCredentials(region, service string,
credentialsProvider aws.Cr
}
signer := v4.NewSigner()
- stdReq := req.ToStdRequest()
+ stdReq, err := req.ToStdRequest()
+ if err != nil {
+ return err
+ }
stdReq.Body = nil // Body is handled separately via payload hash
if err := signer.SignHTTP(ctx, creds, stdReq,
req.PayloadHash(), service, region, time.Now()); err != nil {
diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go
index 995a0259fc..e98f3321e1 100644
--- a/gremlin-go/driver/client.go
+++ b/gremlin-go/driver/client.go
@@ -112,8 +112,10 @@ func NewClient(url string, configurations ...func(settings
*ClientSettings)) (*C
// Close closes the client via connection.
// This is idempotent due to the underlying close() methods being idempotent
as well.
func (client *Client) Close() {
- // TODO check what needs to be closed
client.logHandler.logf(Info, closeClient, client.url)
+ if client.protocol != nil {
+ client.protocol.close()
+ }
}
func (client *Client) errorCallback() {
@@ -124,9 +126,6 @@ func (client *Client) errorCallback() {
func (client *Client) SubmitWithOptions(traversalString string, requestOptions
RequestOptions) (ResultSet, error) {
client.logHandler.logf(Debug, submitStartedString, traversalString)
request := MakeStringRequest(traversalString, client.traversalSource,
requestOptions)
-
- // TODO interceptors (ie. auth)
-
rs, err := client.protocol.send(&request)
return rs, err
}
@@ -143,10 +142,8 @@ func (client *Client) Submit(traversalString string,
bindings ...map[string]inte
}
// submitGremlinLang submits GremlinLang to the server to execute and returns
a ResultSet.
-// TODO test and update when connection is set up
func (client *Client) submitGremlinLang(gremlinLang *GremlinLang) (ResultSet,
error) {
client.logHandler.logf(Debug, submitStartedString, *gremlinLang)
- // TODO placeholder
requestOptionsBuilder := new(RequestOptionsBuilder)
if len(gremlinLang.GetParameters()) > 0 {
requestOptionsBuilder.SetBindings(gremlinLang.GetParameters())
diff --git a/gremlin-go/driver/driverRemoteConnection.go
b/gremlin-go/driver/driverRemoteConnection.go
index 0cf18bd729..93d0b42683 100644
--- a/gremlin-go/driver/driverRemoteConnection.go
+++ b/gremlin-go/driver/driverRemoteConnection.go
@@ -125,7 +125,6 @@ func (driver *DriverRemoteConnection)
Submit(traversalString string) (ResultSet,
}
// submitGremlinLang sends a GremlinLang traversal to the server.
-// TODO test and update when connection is set up
func (driver *DriverRemoteConnection) submitGremlinLang(gremlinLang
*GremlinLang) (ResultSet, error) {
if driver.isClosed {
return nil,
newError(err0203SubmitGremlinLangToClosedConnectionError)
diff --git a/gremlin-go/driver/gremlinlang.go b/gremlin-go/driver/gremlinlang.go
index 498cda8206..cd2b724487 100644
--- a/gremlin-go/driver/gremlinlang.go
+++ b/gremlin-go/driver/gremlinlang.go
@@ -567,17 +567,3 @@ func (gl *GremlinLang) convertArgument(arg interface{})
(interface{}, error) {
}
}
}
-
-// TODO revisit and remove if necessary
-//var withOptionsMap map[any]string = map[any]string{
-// WithOptions.Tokens: "WithOptions.tokens",
-// WithOptions.None: "WithOptions.none",
-// WithOptions.Ids: "WithOptions.ids",
-// WithOptions.Labels: "WithOptions.labels",
-// WithOptions.Keys: "WithOptions.keys",
-// WithOptions.Values: "WithOptions.values",
-// WithOptions.All: "WithOptions.all",
-// WithOptions.Indexer: "WithOptions.indexer",
-// WithOptions.List: "WithOptions.list",
-// WithOptions.Map: "WithOptions.map",
-//}
diff --git a/gremlin-go/driver/httpConnection.go
b/gremlin-go/driver/httpConnection.go
index b96a6211ef..4f034e0e65 100644
--- a/gremlin-go/driver/httpConnection.go
+++ b/gremlin-go/driver/httpConnection.go
@@ -62,10 +62,14 @@ func NewHttpRequest(method, rawURL string) (*HttpRequest,
error) {
}
// ToStdRequest converts HttpRequest to a standard http.Request for signing.
-func (r *HttpRequest) ToStdRequest() *http.Request {
- req, _ := http.NewRequest(r.Method, r.URL.String(),
bytes.NewReader(r.Body))
+// Returns nil if the request cannot be created (invalid method or URL).
+func (r *HttpRequest) ToStdRequest() (*http.Request, error) {
+ req, err := http.NewRequest(r.Method, r.URL.String(),
bytes.NewReader(r.Body))
+ if err != nil {
+ return nil, err
+ }
req.Header = r.Headers
- return req
+ return req, nil
}
// PayloadHash returns the SHA256 hash of the request body for SigV4 signing.
@@ -184,7 +188,11 @@ func (c *httpConnection) executeAndStream(data []byte, rs
ResultSet) {
rs.setError(err)
return
}
- defer resp.Body.Close()
+ defer func() {
+ if err := resp.Body.Close(); err != nil {
+ c.logHandler.logf(Debug, failedToCloseResponseBody,
err.Error())
+ }
+ }()
reader, zlibReader, err := c.getReader(resp)
if err != nil {
@@ -193,7 +201,11 @@ func (c *httpConnection) executeAndStream(data []byte, rs
ResultSet) {
return
}
if zlibReader != nil {
- defer zlibReader.Close()
+ defer func() {
+ if err := zlibReader.Close(); err != nil {
+ c.logHandler.logf(Debug,
failedToCloseDecompReader, err.Error())
+ }
+ }()
}
c.streamToResultSet(reader, rs)
@@ -224,8 +236,8 @@ func (c *httpConnection) getReader(resp *http.Response)
(io.Reader, io.Closer, e
}
func (c *httpConnection) streamToResultSet(reader io.Reader, rs ResultSet) {
- d := newStreamingDeserializer(reader)
- if err := d.readHeader(); err != nil {
+ d := NewStreamingDeserializer(reader)
+ if err := d.ReadHeader(); err != nil {
if err != io.EOF {
c.logHandler.logf(Error, failedToReceiveResponse,
err.Error())
rs.setError(err)
@@ -234,7 +246,7 @@ func (c *httpConnection) streamToResultSet(reader
io.Reader, rs ResultSet) {
}
for {
- obj, err := d.readFullyQualified()
+ obj, err := d.ReadFullyQualified()
if err != nil {
if err != io.EOF {
c.logHandler.logf(Error,
failedToReceiveResponse, err.Error())
@@ -244,7 +256,7 @@ func (c *httpConnection) streamToResultSet(reader
io.Reader, rs ResultSet) {
}
if marker, ok := obj.(Marker); ok && marker == EndOfStream() {
- code, msg, _, err := d.readStatus()
+ code, msg, _, err := d.ReadStatus()
if err != nil {
c.logHandler.logf(Error,
failedToReceiveResponse, err.Error())
rs.setError(err)
diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go
index d198a224a9..388ce44a2c 100644
--- a/gremlin-go/driver/logger.go
+++ b/gremlin-go/driver/logger.go
@@ -114,4 +114,6 @@ const (
logErrorGeneric errorKey = "LOG_ERROR_GENERIC"
closeDriverRemoteConnection errorKey = "CLOSE_DRIVER_REMOTE_CONNECTION"
closeClient errorKey = "CLOSE_CLIENT"
+ failedToCloseResponseBody errorKey = "FAILED_TO_CLOSE_RESPONSE_BODY"
+ failedToCloseDecompReader errorKey =
"FAILED_TO_CLOSE_DECOMPRESSION_READER"
)
diff --git a/gremlin-go/driver/resources/logger-messages/en.json
b/gremlin-go/driver/resources/logger-messages/en.json
index 82bdce46a3..f7025e837a 100644
--- a/gremlin-go/driver/resources/logger-messages/en.json
+++ b/gremlin-go/driver/resources/logger-messages/en.json
@@ -20,5 +20,7 @@
"POOL_NEW_CONNECTION_ERROR": "Falling back to least-used connection.
Creating new connection due to least-used connection exceeding concurrent usage
threshold failed: %s",
"POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d
exceeded MaximumConcurrentConnections setting %d - limiting
InitialConcurrentConnections to %d.",
"FAILED_TO_RECEIVE_RESPONSE": "Failed to receive response: %s",
- "FAILED_TO_SEND_REQUEST": "Failed to send request: %s"
+ "FAILED_TO_SEND_REQUEST": "Failed to send request: %s",
+ "FAILED_TO_CLOSE_RESPONSE_BODY": "Error closing response body: %s",
+ "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader:
%s"
}
diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go
index c2d68c6500..98a5f3142c 100644
--- a/gremlin-go/driver/serializer.go
+++ b/gremlin-go/driver/serializer.go
@@ -149,26 +149,20 @@ func (gs *GraphBinarySerializer)
DeserializeMessage(message []byte) (Response, e
//Skip version and nullable byte.
i := 2
- // TODO temp serialization before fully streaming set-up
for len(message) > 0 {
n, err := readFullyQualifiedNullable(&message, &i, true)
if err != nil {
return msg, err
}
- // TODO for debug, remove later
- //_, _ = fmt.Fprintf(os.Stdout, "Deserializing data : %v\n", n)
if n == EndOfStream() {
break
}
results = append(results, n)
}
- // TODO for debug, remove later
- //_, _ = fmt.Fprintf(os.Stdout, "Deserialized results : %s\n", results)
msg.ResponseResult.Data = results
code := readUint32Safe(&message, &i)
msg.ResponseStatus.code = code
- // TODO read status message
msg.ResponseStatus.message = "OK"
statusMsg, err := readUnqualified(&message, &i, stringType, true)
if err != nil {
diff --git a/gremlin-go/driver/streamingDeserializer.go
b/gremlin-go/driver/streamingDeserializer.go
index e5541625f5..0c69460a1c 100644
--- a/gremlin-go/driver/streamingDeserializer.go
+++ b/gremlin-go/driver/streamingDeserializer.go
@@ -32,9 +32,31 @@ import (
"github.com/google/uuid"
)
-// streamingDeserializer reads GraphBinary directly from bufio.Reader
-// without interface overhead for maximum performance
-type streamingDeserializer struct {
+// StreamingDeserializer reads GraphBinary data directly from an io.Reader,
+// enabling streaming deserialization of server responses.
+//
+// Streaming Behavior:
+// The deserializer is designed to work with HTTP chunked transfer encoding
where
+// the server streams results as they become available. Key behaviors:
+//
+// 1. Blocking on partial data: When reading an object, if the underlying
reader
+// doesn't have enough bytes available, the deserializer blocks until the
data
+// arrives. This is handled by io.ReadFull which waits for the exact
number of
+// bytes needed based on GraphBinary's self-describing format (type codes
and
+// length prefixes).
+//
+// 2. Chunk boundary independence: Go's HTTP client may receive data in
chunks that
+// don't align with server-sent GraphBinary object boundaries. The
deserializer
+// handles this transparently - it reads exactly the bytes needed for each
object,
+// blocking if necessary, regardless of how the data was chunked by the
network.
+//
+// 3. Immediate object delivery: Each complete object is returned as soon as
it's
+// fully read, allowing the caller to process results incrementally rather
than
+// waiting for the entire response.
+//
+// The bufio.Reader wrapper provides efficient buffering without affecting the
+// streaming semantics - it simply reduces the number of underlying read
syscalls.
+type StreamingDeserializer struct {
r *bufio.Reader
buf [8]byte
err error // sticky error
@@ -43,11 +65,13 @@ type streamingDeserializer struct {
// GraphBinary flag for bulked list/set
const flagBulked = 0x02
-func newStreamingDeserializer(r io.Reader) *streamingDeserializer {
- return &streamingDeserializer{r: bufio.NewReaderSize(r, 8192)}
+// NewStreamingDeserializer creates a new StreamingDeserializer that reads
from the given io.Reader.
+// The reader is wrapped in a buffered reader for efficient reading.
+func NewStreamingDeserializer(r io.Reader) *StreamingDeserializer {
+ return &StreamingDeserializer{r: bufio.NewReaderSize(r, 8192)}
}
-func (d *streamingDeserializer) readByte() (byte, error) {
+func (d *StreamingDeserializer) readByte() (byte, error) {
if d.err != nil {
return 0, d.err
}
@@ -59,7 +83,7 @@ func (d *streamingDeserializer) readByte() (byte, error) {
return b, nil
}
-func (d *streamingDeserializer) readBytes(n int) ([]byte, error) {
+func (d *StreamingDeserializer) readBytes(n int) ([]byte, error) {
if d.err != nil {
return nil, d.err
}
@@ -72,7 +96,7 @@ func (d *streamingDeserializer) readBytes(n int) ([]byte,
error) {
return buf, nil
}
-func (d *streamingDeserializer) readInt32() (int32, error) {
+func (d *StreamingDeserializer) readInt32() (int32, error) {
if d.err != nil {
return 0, d.err
}
@@ -84,7 +108,7 @@ func (d *streamingDeserializer) readInt32() (int32, error) {
return int32(binary.BigEndian.Uint32(d.buf[:4])), nil
}
-func (d *streamingDeserializer) readUint32() (uint32, error) {
+func (d *StreamingDeserializer) readUint32() (uint32, error) {
if d.err != nil {
return 0, d.err
}
@@ -96,7 +120,7 @@ func (d *streamingDeserializer) readUint32() (uint32, error)
{
return binary.BigEndian.Uint32(d.buf[:4]), nil
}
-func (d *streamingDeserializer) readInt64() (int64, error) {
+func (d *StreamingDeserializer) readInt64() (int64, error) {
if d.err != nil {
return 0, d.err
}
@@ -108,7 +132,9 @@ func (d *streamingDeserializer) readInt64() (int64, error) {
return int64(binary.BigEndian.Uint64(d.buf[:8])), nil
}
-func (d *streamingDeserializer) readHeader() error {
+// ReadHeader reads and validates the GraphBinary response header.
+// This must be called before reading any objects from the stream.
+func (d *StreamingDeserializer) ReadHeader() error {
if _, err := d.readByte(); err != nil {
return err
}
@@ -116,7 +142,10 @@ func (d *streamingDeserializer) readHeader() error {
return err
}
-func (d *streamingDeserializer) readFullyQualified() (interface{}, error) {
+// ReadFullyQualified reads the next fully-qualified GraphBinary value from
the stream.
+// Returns the deserialized object, or an error if reading fails.
+// When the end of the result stream is reached, this returns a Marker equal
to EndOfStream().
+func (d *StreamingDeserializer) ReadFullyQualified() (interface{}, error) {
dtByte, err := d.readByte()
if err != nil {
return nil, err
@@ -138,7 +167,7 @@ func (d *streamingDeserializer) readFullyQualified()
(interface{}, error) {
return d.readValue(dt, flag)
}
-func (d *streamingDeserializer) readValue(dt dataType, flag byte)
(interface{}, error) {
+func (d *StreamingDeserializer) readValue(dt dataType, flag byte)
(interface{}, error) {
switch dt {
case intType:
return d.readInt32()
@@ -231,7 +260,7 @@ func (d *streamingDeserializer) readValue(dt dataType, flag
byte) (interface{},
}
}
-func (d *streamingDeserializer) readString() (string, error) {
+func (d *StreamingDeserializer) readString() (string, error) {
length, err := d.readInt32()
if err != nil {
return "", err
@@ -246,14 +275,14 @@ func (d *streamingDeserializer) readString() (string,
error) {
return string(buf), nil
}
-func (d *streamingDeserializer) readList(bulked bool) (interface{}, error) {
+func (d *StreamingDeserializer) readList(bulked bool) (interface{}, error) {
length, err := d.readInt32()
if err != nil {
return nil, err
}
list := make([]interface{}, 0, length)
for i := int32(0); i < length; i++ {
- val, err := d.readFullyQualified()
+ val, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -272,18 +301,18 @@ func (d *streamingDeserializer) readList(bulked bool)
(interface{}, error) {
return list, nil
}
-func (d *streamingDeserializer) readMap() (interface{}, error) {
+func (d *StreamingDeserializer) readMap() (interface{}, error) {
length, err := d.readUint32()
if err != nil {
return nil, err
}
m := make(map[interface{}]interface{}, length)
for i := uint32(0); i < length; i++ {
- key, err := d.readFullyQualified()
+ key, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
- val, err := d.readFullyQualified()
+ val, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -300,8 +329,8 @@ func (d *streamingDeserializer) readMap() (interface{},
error) {
return m, nil
}
-func (d *streamingDeserializer) readVertex(withProps bool) (*Vertex, error) {
- id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readVertex(withProps bool) (*Vertex, error) {
+ id, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -319,7 +348,7 @@ func (d *streamingDeserializer) readVertex(withProps bool)
(*Vertex, error) {
}
v := &Vertex{Element: Element{Id: id, Label: label}}
if withProps {
- props, err := d.readFullyQualified()
+ props, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -331,8 +360,8 @@ func (d *streamingDeserializer) readVertex(withProps bool)
(*Vertex, error) {
return v, nil
}
-func (d *streamingDeserializer) readEdge() (*Edge, error) {
- id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readEdge() (*Edge, error) {
+ id, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -359,7 +388,7 @@ func (d *streamingDeserializer) readEdge() (*Edge, error) {
if _, err := d.readBytes(2); err != nil {
return nil, err
}
- props, err := d.readFullyQualified()
+ props, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -375,12 +404,12 @@ func (d *streamingDeserializer) readEdge() (*Edge, error)
{
return e, nil
}
-func (d *streamingDeserializer) readPath() (*Path, error) {
- labels, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readPath() (*Path, error) {
+ labels, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
- objects, err := d.readFullyQualified()
+ objects, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -405,12 +434,12 @@ func (d *streamingDeserializer) readPath() (*Path, error)
{
return path, nil
}
-func (d *streamingDeserializer) readProperty() (*Property, error) {
+func (d *StreamingDeserializer) readProperty() (*Property, error) {
key, err := d.readString()
if err != nil {
return nil, err
}
- value, err := d.readFullyQualified()
+ value, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -420,8 +449,8 @@ func (d *streamingDeserializer) readProperty() (*Property,
error) {
return &Property{Key: key, Value: value}, nil
}
-func (d *streamingDeserializer) readVertexProperty() (*VertexProperty, error) {
- id, err := d.readFullyQualified()
+func (d *StreamingDeserializer) readVertexProperty() (*VertexProperty, error) {
+ id, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -437,14 +466,14 @@ func (d *streamingDeserializer) readVertexProperty()
(*VertexProperty, error) {
if !ok {
return nil, newError(err0404ReadNullTypeError)
}
- value, err := d.readFullyQualified()
+ value, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
if _, err := d.readBytes(2); err != nil {
return nil, err
}
- props, err := d.readFullyQualified()
+ props, err := d.ReadFullyQualified()
if err != nil {
return nil, err
}
@@ -459,7 +488,7 @@ func (d *streamingDeserializer) readVertexProperty()
(*VertexProperty, error) {
return vp, nil
}
-func (d *streamingDeserializer) readBigInt() (*big.Int, error) {
+func (d *StreamingDeserializer) readBigInt() (*big.Int, error) {
length, err := d.readInt32()
if err != nil {
return nil, err
@@ -480,7 +509,7 @@ func (d *streamingDeserializer) readBigInt() (*big.Int,
error) {
return bi, nil
}
-func (d *streamingDeserializer) readBigDecimal() (*BigDecimal, error) {
+func (d *StreamingDeserializer) readBigDecimal() (*BigDecimal, error) {
scale, err := d.readInt32()
if err != nil {
return nil, err
@@ -492,7 +521,7 @@ func (d *streamingDeserializer) readBigDecimal()
(*BigDecimal, error) {
return &BigDecimal{Scale: scale, UnscaledValue: unscaled}, nil
}
-func (d *streamingDeserializer) readDateTime() (time.Time, error) {
+func (d *StreamingDeserializer) readDateTime() (time.Time, error) {
year, err := d.readInt32()
if err != nil {
return time.Time{}, err
@@ -522,7 +551,7 @@ func (d *streamingDeserializer) readDateTime() (time.Time,
error) {
return time.Date(int(year), time.Month(month), int(day), int(h),
int(m), int(s), int(ns), GetTimezoneFromOffset(int(offset))), nil
}
-func (d *streamingDeserializer) readDuration() (time.Duration, error) {
+func (d *StreamingDeserializer) readDuration() (time.Duration, error) {
seconds, err := d.readInt64()
if err != nil {
return 0, err
@@ -534,7 +563,7 @@ func (d *streamingDeserializer) readDuration()
(time.Duration, error) {
return time.Duration(seconds*int64(time.Second) + int64(nanos)), nil
}
-func (d *streamingDeserializer) readByteBuffer() (*ByteBuffer, error) {
+func (d *StreamingDeserializer) readByteBuffer() (*ByteBuffer, error) {
length, err := d.readInt32()
if err != nil {
return nil, err
@@ -546,7 +575,7 @@ func (d *streamingDeserializer) readByteBuffer()
(*ByteBuffer, error) {
return &ByteBuffer{Data: data}, nil
}
-func (d *streamingDeserializer) readEnum() (string, error) {
+func (d *StreamingDeserializer) readEnum() (string, error) {
if _, err := d.readByte(); err != nil { // type code (string)
return "", err
}
@@ -556,12 +585,14 @@ func (d *streamingDeserializer) readEnum() (string,
error) {
return d.readString()
}
-func (d *streamingDeserializer) readStatus() (uint32, string, string, error) {
- code, err := d.readUint32()
+// ReadStatus reads the response status after the EndOfStream marker.
+// Returns the status code, message, exception string, and any error
encountered.
+// This should be called after ReadFullyQualified() returns an EndOfStream
marker.
+func (d *StreamingDeserializer) ReadStatus() (code uint32, message string,
exception string, err error) {
+ code, err = d.readUint32()
if err != nil {
return 0, "", "", err
}
- var message, exception string
flag, err := d.readByte()
if err != nil {
return code, "", "", err
diff --git a/gremlin-go/driver/streamingDeserializer_test.go
b/gremlin-go/driver/streamingDeserializer_test.go
index 91ad173444..96cc7cdbe0 100644
--- a/gremlin-go/driver/streamingDeserializer_test.go
+++ b/gremlin-go/driver/streamingDeserializer_test.go
@@ -22,16 +22,60 @@ package gremlingo
import (
"bytes"
"io"
+ "sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
+// slowReader simulates a network stream that delivers data in chunks with
delays.
+// This mimics how Go's HTTP client receives chunked transfer-encoded responses
+// where chunk boundaries don't align with GraphBinary object boundaries.
+type slowReader struct {
+ chunks [][]byte
+ delay time.Duration
+ index int
+ offset int
+ mu sync.Mutex
+}
+
+func newSlowReader(chunks [][]byte, delay time.Duration) *slowReader {
+ return &slowReader{chunks: chunks, delay: delay}
+}
+
+func (r *slowReader) Read(p []byte) (n int, err error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if r.index >= len(r.chunks) {
+ return 0, io.EOF
+ }
+
+ // Simulate network delay between chunks
+ if r.offset == 0 && r.index > 0 {
+ r.mu.Unlock()
+ time.Sleep(r.delay)
+ r.mu.Lock()
+ }
+
+ chunk := r.chunks[r.index]
+ remaining := chunk[r.offset:]
+ n = copy(p, remaining)
+ r.offset += n
+
+ if r.offset >= len(chunk) {
+ r.index++
+ r.offset = 0
+ }
+
+ return n, nil
+}
+
func TestStreamingDeserializer(t *testing.T) {
t.Run("readInt32", func(t *testing.T) {
data := []byte{0x00, 0x00, 0x00, 0x2A} // 42
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
val, err := d.readInt32()
assert.Nil(t, err)
assert.Equal(t, int32(42), val)
@@ -39,7 +83,7 @@ func TestStreamingDeserializer(t *testing.T) {
t.Run("readInt64", func(t *testing.T) {
data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64}
// 100
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
val, err := d.readInt64()
assert.Nil(t, err)
assert.Equal(t, int64(100), val)
@@ -47,7 +91,7 @@ func TestStreamingDeserializer(t *testing.T) {
t.Run("readString", func(t *testing.T) {
data := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o'}
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
val, err := d.readString()
assert.Nil(t, err)
assert.Equal(t, "hello", val)
@@ -55,7 +99,7 @@ func TestStreamingDeserializer(t *testing.T) {
t.Run("readString empty", func(t *testing.T) {
data := []byte{0x00, 0x00, 0x00, 0x00}
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
val, err := d.readString()
assert.Nil(t, err)
assert.Equal(t, "", val)
@@ -63,14 +107,14 @@ func TestStreamingDeserializer(t *testing.T) {
t.Run("error on incomplete data", func(t *testing.T) {
data := []byte{0x00, 0x00} // incomplete int32
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
_, err := d.readInt32()
assert.ErrorIs(t, err, io.ErrUnexpectedEOF)
})
t.Run("sticky error", func(t *testing.T) {
data := []byte{0x00} // too short
- d := newStreamingDeserializer(bytes.NewReader(data))
+ d := NewStreamingDeserializer(bytes.NewReader(data))
_, err1 := d.readInt32()
assert.Error(t, err1)
@@ -115,3 +159,244 @@ func TestStreamingChannelDelivery(t *testing.T) {
}
})
}
+
+// TestStreamingBlocksOnPartialData verifies that the deserializer correctly
blocks
+// when it receives partial data, waiting for the rest of the object to arrive.
+// This simulates the real-world scenario where Go's HTTP client receives
chunks
+// that don't align with server-sent GraphBinary object boundaries.
+func TestStreamingBlocksOnPartialData(t *testing.T) {
+ t.Run("blocks until complete int32 is available", func(t *testing.T) {
+ // Split a 4-byte int32 across two chunks
+ chunk1 := []byte{0x00, 0x00} // First 2 bytes
+ chunk2 := []byte{0x00, 0x2A} // Last 2 bytes (total = 42)
+
+ reader := newSlowReader([][]byte{chunk1, chunk2},
20*time.Millisecond)
+ d := NewStreamingDeserializer(reader)
+
+ start := time.Now()
+ val, err := d.readInt32()
+ elapsed := time.Since(start)
+
+ assert.Nil(t, err)
+ assert.Equal(t, int32(42), val)
+ // Should have blocked waiting for second chunk
+ assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+ "Should have blocked waiting for remaining bytes")
+ })
+
+ t.Run("blocks until complete string is available", func(t *testing.T) {
+ // String "hello" split across chunks:
+ // Chunk 1: length (4 bytes) + partial content
+ // Chunk 2: remaining content
+ chunk1 := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e'} // length=5,
"he"
+ chunk2 := []byte{'l', 'l', 'o'} // "llo"
+
+ reader := newSlowReader([][]byte{chunk1, chunk2},
20*time.Millisecond)
+ d := NewStreamingDeserializer(reader)
+
+ start := time.Now()
+ val, err := d.readString()
+ elapsed := time.Since(start)
+
+ assert.Nil(t, err)
+ assert.Equal(t, "hello", val)
+ assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+ "Should have blocked waiting for remaining string
bytes")
+ })
+}
+
+// TestStreamingMultipleObjects verifies that multiple GraphBinary objects
+// can be read from a stream, with each object returned as soon as it's
complete.
+func TestStreamingMultipleObjects(t *testing.T) {
+ t.Run("reads multiple objects as they arrive", func(t *testing.T) {
+ // Build a stream with 3 fully-qualified int32 values
+ // Each int32: type(1) + flag(1) + value(4) = 6 bytes
+ obj1 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x01} // int32 = 1
+ obj2 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x02} // int32 = 2
+ obj3 := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x03} // int32 = 3
+
+ // Deliver each object as a separate chunk with delay
+ reader := newSlowReader([][]byte{obj1, obj2, obj3},
15*time.Millisecond)
+ d := NewStreamingDeserializer(reader)
+
+ var results []int32
+ var times []time.Duration
+ start := time.Now()
+
+ for i := 0; i < 3; i++ {
+ val, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ results = append(results, val.(int32))
+ times = append(times, time.Since(start))
+ }
+
+ assert.Equal(t, []int32{1, 2, 3}, results)
+
+ // Objects should arrive with delays between them
+ for i := 1; i < len(times); i++ {
+ gap := times[i] - times[i-1]
+ assert.GreaterOrEqual(t, gap, 10*time.Millisecond,
+ "Object %d should have arrived after a delay",
i)
+ }
+ })
+
+ t.Run("handles object split across chunk boundary", func(t *testing.T) {
+ // First chunk: complete object + partial second object
+ // Second chunk: rest of second object + complete third object
+ chunk1 := []byte{
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // int32 = 1
(complete)
+ 0x01, 0x00, 0x00, // partial int32 (type + flag + 2
bytes of value)
+ }
+ chunk2 := []byte{
+ 0x00, 0x00, 0x02, // rest of int32 = 2
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, // int32 = 3
(complete)
+ }
+
+ reader := newSlowReader([][]byte{chunk1, chunk2},
20*time.Millisecond)
+ d := NewStreamingDeserializer(reader)
+
+ // First object should return immediately
+ val1, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ assert.Equal(t, int32(1), val1)
+
+ // Second object should block waiting for chunk2
+ start := time.Now()
+ val2, err := d.ReadFullyQualified()
+ elapsed := time.Since(start)
+ assert.Nil(t, err)
+ assert.Equal(t, int32(2), val2)
+ assert.GreaterOrEqual(t, elapsed, 15*time.Millisecond,
+ "Should have blocked waiting for rest of object")
+
+ // Third object should return immediately (already in buffer)
+ val3, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ assert.Equal(t, int32(3), val3)
+ })
+}
+
+// TestStreamingWithEndOfStreamMarker verifies that the deserializer correctly
+// handles the EndOfStream marker and subsequent status reading.
+func TestStreamingWithEndOfStreamMarker(t *testing.T) {
+ t.Run("reads EndOfStream marker and status", func(t *testing.T) {
+ // Build a complete response:
+ // - Header (2 bytes): version + flags
+ // - One int32 result
+ // - EndOfStream marker
+ // - Status: code(4) + message(nullable) + exception(nullable)
+ data := []byte{
+ 0x81, 0x00, // Header: version byte + no bulking
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x2A, // int32 = 42
+ 0xfd, 0x00, 0x00, // Marker type + flag + value=0
(EndOfStream)
+ 0x00, 0x00, 0x00, 0xC8, // Status code = 200
+ 0x01, // Message is null
+ 0x01, // Exception is null
+ }
+
+ d := NewStreamingDeserializer(bytes.NewReader(data))
+
+ // Read header
+ err := d.ReadHeader()
+ assert.Nil(t, err)
+
+ // Read the result
+ val, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ assert.Equal(t, int32(42), val)
+
+ // Read EndOfStream marker
+ marker, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ assert.Equal(t, EndOfStream(), marker)
+
+ // Read status
+ code, msg, exc, err := d.ReadStatus()
+ assert.Nil(t, err)
+ assert.Equal(t, uint32(200), code)
+ assert.Equal(t, "", msg)
+ assert.Equal(t, "", exc)
+ })
+
+ t.Run("reads status with message", func(t *testing.T) {
+ // Status with a message
+ data := []byte{
+ 0xfd, 0x00, 0x00, // EndOfStream marker
+ 0x00, 0x00, 0x01, 0x90, // Status code = 400
+ 0x00, //
Message is not null
+ 0x00, 0x00, 0x00, 0x05, 'e', 'r', 'r', 'o', 'r', //
Message = "error"
+ 0x01, // Exception is null
+ }
+
+ d := NewStreamingDeserializer(bytes.NewReader(data))
+
+ marker, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+ assert.Equal(t, EndOfStream(), marker)
+
+ code, msg, exc, err := d.ReadStatus()
+ assert.Nil(t, err)
+ assert.Equal(t, uint32(400), code)
+ assert.Equal(t, "error", msg)
+ assert.Equal(t, "", exc)
+ })
+}
+
+// TestStreamingComplexTypes verifies streaming deserialization of complex
types
+// like vertices, edges, and paths.
+func TestStreamingComplexTypes(t *testing.T) {
+ t.Run("reads vertex from stream", func(t *testing.T) {
+ // Vertex: id(int32) + labels(list of string) +
properties(nullable)
+ data := []byte{
+ 0x11, 0x00, // Vertex type + flag
+ // ID: int32 = 1
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x01,
+ // Labels: list with one string "person"
+ 0x00, 0x00, 0x00, 0x01, // list length = 1
+ 0x03, 0x00, // string type + flag
+ 0x00, 0x00, 0x00, 0x06, 'p', 'e', 'r', 's', 'o', 'n',
+ // Properties: null
+ 0xfe, 0x01,
+ }
+
+ d := NewStreamingDeserializer(bytes.NewReader(data))
+ val, err := d.ReadFullyQualified()
+ assert.Nil(t, err)
+
+ v, ok := val.(*Vertex)
+ assert.True(t, ok)
+ assert.Equal(t, int32(1), v.Id)
+ assert.Equal(t, "person", v.Label)
+ })
+
+ t.Run("reads list of integers from chunked stream", func(t *testing.T) {
+ // List type split across chunks
+ chunk1 := []byte{
+ 0x09, 0x00, // List type + flag
+ 0x00, 0x00, 0x00, 0x03, // length = 3
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x0A, // int32 = 10
+ }
+ chunk2 := []byte{
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x14, // int32 = 20
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x1E, // int32 = 30
+ }
+
+ reader := newSlowReader([][]byte{chunk1, chunk2},
15*time.Millisecond)
+ d := NewStreamingDeserializer(reader)
+
+ start := time.Now()
+ val, err := d.ReadFullyQualified()
+ elapsed := time.Since(start)
+
+ assert.Nil(t, err)
+ list, ok := val.([]interface{})
+ assert.True(t, ok)
+ assert.Equal(t, 3, len(list))
+ assert.Equal(t, int32(10), list[0])
+ assert.Equal(t, int32(20), list[1])
+ assert.Equal(t, int32(30), list[2])
+
+ // Should have blocked for second chunk
+ assert.GreaterOrEqual(t, elapsed, 10*time.Millisecond)
+ })
+}
diff --git a/gremlin-go/driver/traversal.go b/gremlin-go/driver/traversal.go
index 3ac5e934f3..40e8d79785 100644
--- a/gremlin-go/driver/traversal.go
+++ b/gremlin-go/driver/traversal.go
@@ -44,7 +44,6 @@ func (t *Traversal) ToList() ([]*Result, error) {
return nil, newError(err0901ToListAnonTraversalError)
}
- // TODO update and test when connection is set up
results, err := t.remote.submitGremlinLang(t.GremlinLang)
if err != nil {
return nil, err
@@ -80,7 +79,6 @@ func (t *Traversal) Iterate() <-chan error {
t.GremlinLang.AddStep("discard")
- // TODO update and test when connection is set up
res, err := t.remote.submitGremlinLang(t.GremlinLang)
if err != nil {
r <- err
@@ -124,7 +122,6 @@ func (t *Traversal) Next() (*Result, error) {
// GetResultSet submits the traversal and returns the ResultSet.
func (t *Traversal) GetResultSet() (ResultSet, error) {
if t.results == nil {
- // TODO update and test when connection is set up
results, err := t.remote.submitGremlinLang(t.GremlinLang)
if err != nil {
return nil, err
@@ -749,33 +746,6 @@ var IO = ioconfig{
Registry: "~tinkerpop.ioconfig.registry",
}
-// TODO pending update/removal
-// Metrics holds metrics data; typically for .profile()-step analysis. Metrics
may be nested. Nesting enables
-// the ability to capture explicit metrics for multiple distinct operations.
Annotations are used to store
-// miscellaneous notes that might be useful to a developer when examining
results, such as index coverage
-// for Steps in a Traversal.
-//type Metrics struct {
-// Id string
-// Name string
-// // the duration in nanoseconds.
-// Duration int64
-// Counts map[string]int64
-// Annotations map[string]interface{}
-// NestedMetrics []Metrics
-//}
-
-// TraversalMetrics contains the Metrics gathered for a Traversal as the
result of the .profile()-step.
-//type TraversalMetrics struct {
-// // the duration in nanoseconds.
-// Duration int64
-// Metrics []Metrics
-//}
-
-// GremlinType represents the GraphBinary type Class which can be used to
serialize a class.
-//type GremlinType struct {
-// Fqcn string
-//}
-
// BigDecimal represents an arbitrary-precision signed decimal number,
consisting of an arbitrary precision integer
// unscaled value and a 32-bit integer scale.
type BigDecimal struct {