This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch go-http-streaming
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 6bbbb7b5bdee088377ff5e5891f3d1e5d0df5586
Author: Yang Xia <[email protected]>
AuthorDate: Thu Jan 15 09:42:31 2026 -0800

    trying streaming deser
---
 gremlin-go/driver/connection_test.go            |  61 +++
 gremlin-go/driver/httpProtocol.go               |  22 +-
 gremlin-go/driver/httpTransporter.go            |  83 ++--
 gremlin-go/driver/streamingDeserializer.go      | 583 ++++++++++++++++++++++++
 gremlin-go/driver/streamingDeserializer_test.go | 117 +++++
 5 files changed, 803 insertions(+), 63 deletions(-)

diff --git a/gremlin-go/driver/connection_test.go 
b/gremlin-go/driver/connection_test.go
index 153f0b0c74..14d4073c33 100644
--- a/gremlin-go/driver/connection_test.go
+++ b/gremlin-go/driver/connection_test.go
@@ -29,6 +29,7 @@ import (
        "strconv"
        "sync"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
 )
@@ -834,3 +835,63 @@ func submitCount(i int, client *Client, t *testing.T) {
        assert.Equal(t, 6+i, c)
        _, _ = fmt.Fprintf(os.Stdout, "Received result : %s\n", result)
 }
+
+func TestStreamingResultDelivery(t *testing.T) {
+       testNoAuthWithAliasEnable := 
getEnvOrDefaultBool("RUN_INTEGRATION_WITH_ALIAS_TESTS", true)
+       skipTestsIfNotEnabled(t, integrationTestSuiteName, 
testNoAuthWithAliasEnable)
+       remote, err := 
NewDriverRemoteConnection(getEnvOrDefaultString("GREMLIN_SERVER_URL", 
noAuthUrl),
+               func(settings *DriverRemoteConnectionSettings) {
+                       settings.TlsConfig = &tls.Config{}
+                       settings.AuthInfo = &AuthInfo{}
+                       settings.TraversalSource = "ggrateful"
+               })
+       assert.Nil(t, err)
+       assert.NotNil(t, remote)
+       g := Traversal_().With(remote)
+       defer g.remoteConnection.Close()
+
+       t.Run("first result arrives before all results", func(t *testing.T) {
+               start := time.Now()
+               rs, err := g.V().Properties().GetResultSet()
+               assert.Nil(t, err)
+
+               // First result should arrive quickly
+               _, ok, err := rs.One()
+               firstResultTime := time.Since(start)
+               assert.Nil(t, err)
+               assert.True(t, ok)
+
+               // Drain remaining
+               _, err = rs.All()
+               assert.Nil(t, err)
+               totalTime := time.Since(start)
+
+               t.Logf("First result: %v, Total: %v, Ratio: %.2f%%",
+                       firstResultTime, totalTime, 
float64(firstResultTime)/float64(totalTime)*100)
+       })
+
+       t.Run("results arrive incrementally", func(t *testing.T) {
+               rs, err := g.V().Properties().GetResultSet()
+               assert.Nil(t, err)
+
+               var timestamps []time.Time
+               start := time.Now()
+
+               for {
+                       _, ok, err := rs.One()
+                       assert.Nil(t, err)
+                       if !ok {
+                               break
+                       }
+                       timestamps = append(timestamps, time.Now())
+               }
+
+               if len(timestamps) < 2 {
+                       t.Skip("need more results to test incremental delivery")
+               }
+
+               firstHalf := timestamps[len(timestamps)/2].Sub(start)
+               total := timestamps[len(timestamps)-1].Sub(start)
+               t.Logf("Half results at: %v, All results at: %v", firstHalf, 
total)
+       })
+}
diff --git a/gremlin-go/driver/httpProtocol.go 
b/gremlin-go/driver/httpProtocol.go
index cbf54f9dcf..9ef2041344 100644
--- a/gremlin-go/driver/httpProtocol.go
+++ b/gremlin-go/driver/httpProtocol.go
@@ -20,6 +20,7 @@ under the License.
 package gremlingo
 
 import (
+       "errors"
        "net/http"
 )
 
@@ -68,33 +69,24 @@ func (protocol *httpProtocol) send(request *request) 
(ResultSet, error) {
        // one transport per request
        transport := newHttpTransporter(protocol.url, protocol.connSettings, 
protocol.httpClient, protocol.logHandler)
 
-       // async send request
-       transport.wg.Add(1)
+       // async send request and receive response
        go func() {
-               defer transport.wg.Done()
                err := transport.Write(bytes)
                if err != nil {
-                       transport.Close() // Close transport to unblock receiver
+                       transport.Close()
                        rs.setError(err)
                        rs.Close()
+                       return
                }
-       }()
 
-       // async receive response
-       transport.wg.Add(1)
-       go func() {
-               defer transport.wg.Done()
-               err := protocol.receiveChunkedResponse(rs, transport)
+               err = protocol.receiveChunkedResponse(rs, transport)
                if err != nil {
                        rs.setError(err)
                }
                transport.Close()
        }()
 
-       // Wait for both async operations to complete
-       transport.wg.Wait()
-
-       return rs, rs.GetError()
+       return rs, nil
 }
 
 // receiveChunkedResponse processes individual chunk responses
@@ -102,7 +94,7 @@ func (protocol *httpProtocol) receiveChunkedResponse(rs 
ResultSet, transport *ht
        for {
                resp, err := transport.Read()
                if err != nil {
-                       if err.Error() == "response stream closed" {
+                       if errors.Is(err, ErrResponseStreamClosed) {
                                rs.Close()
                                return nil
                        }
diff --git a/gremlin-go/driver/httpTransporter.go 
b/gremlin-go/driver/httpTransporter.go
index c5c42ea15e..ac487978e6 100644
--- a/gremlin-go/driver/httpTransporter.go
+++ b/gremlin-go/driver/httpTransporter.go
@@ -26,9 +26,11 @@ import (
        "io"
        "net/http"
        "sync"
-       "time"
 )
 
+// ErrResponseStreamClosed is returned when reading from a closed response 
stream
+var ErrResponseStreamClosed = errors.New("response stream closed")
+
 // httpTransporter responsible for sending and receiving bytes to/from the 
server
 type httpTransporter struct {
        url             string
@@ -36,20 +38,16 @@ type httpTransporter struct {
        connSettings    *connectionSettings
        responseChannel chan response // receives response messages
        httpClient      *http.Client
-       wg              *sync.WaitGroup
        logHandler      *logHandler
        closeOnce       sync.Once
 }
 
 func newHttpTransporter(url string, connSettings *connectionSettings, 
httpClient *http.Client, logHandler *logHandler) *httpTransporter {
-       wg := &sync.WaitGroup{}
-
        return &httpTransporter{
                url:             url,
                connSettings:    connSettings,
                responseChannel: make(chan response, 10),
                httpClient:      httpClient,
-               wg:              wg,
                logHandler:      logHandler,
                closeOnce:       sync.Once{},
        }
@@ -94,15 +92,13 @@ func (transporter *httpTransporter) Write(data []byte) 
error {
 
        reader := resp.Body
        if resp.Header.Get("content-encoding") == "deflate" {
-               reader, err = zlib.NewReader(resp.Body)
+               zlibReader, err := zlib.NewReader(resp.Body)
                if err != nil {
                        transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
-                       err := resp.Body.Close()
-                       if err != nil {
-                               return err
-                       }
+                       _ = resp.Body.Close()
                        return err
                }
+               reader = zlibReader
        }
 
        // Start streaming processing in background
@@ -110,54 +106,45 @@ func (transporter *httpTransporter) Write(data []byte) 
error {
        return nil
 }
 
-// streamResponse processes HTTP chunks independently
+// streamResponse processes HTTP response using direct streaming deserializer
 func (transporter *httpTransporter) streamResponse(reader io.Reader, body 
io.Closer) {
-       defer func(body io.Closer) {
-               err := body.Close()
-               if err != nil {
-               }
-       }(body)
+       defer func() {
+               _ = body.Close()
+       }()
        defer transporter.closeResponseChannel()
 
-       serializer := newGraphBinarySerializer(transporter.logHandler)
-       isFirstChunk := true
-
-       chunk := make([]byte, transporter.connSettings.readBufferSize)
-       timer := time.NewTimer(5 * time.Second)
-       defer timer.Stop()
+       d := newStreamingDeserializer(reader)
+       if err := d.readHeader(); err != nil {
+               if err != io.EOF {
+                       transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
+               }
+               return
+       }
 
        for {
-               n, err := reader.Read(chunk)
-               if n > 0 {
-                       msg, procErr := serializer.readChunk(chunk[:n], 
isFirstChunk)
-                       if procErr != nil {
-                               transporter.logHandler.logf(Error, 
failedToReceiveResponse, procErr.Error())
-                               return
-                       }
-                       isFirstChunk = false
-
-                       if !timer.Stop() {
-                               select {
-                               case <-timer.C:
-                               default:
-                               }
+               obj, err := d.readFullyQualified()
+               if err != nil {
+                       if err == io.EOF {
+                               break
                        }
-                       timer.Reset(5 * time.Second)
+                       transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
+                       return
+               }
 
-                       select {
-                       case transporter.responseChannel <- *msg:
-                       case <-timer.C:
-                               transporter.logHandler.logf(Error, 
failedToReceiveResponse, "timeout")
+               if marker, ok := obj.(Marker); ok && marker == EndOfStream() {
+                       code, statusMsg, exception, err := d.readStatus()
+                       if err != nil {
+                               transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
                                return
                        }
+                       transporter.responseChannel <- response{
+                               responseStatus: responseStatus{code: code, 
message: statusMsg, exception: exception},
+                       }
+                       return
                }
 
-               if err == io.EOF {
-                       break
-               }
-               if err != nil {
-                       transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
-                       return
+               transporter.responseChannel <- response{
+                       responseResult: responseResult{data: 
[]interface{}{obj}},
                }
        }
 }
@@ -172,7 +159,7 @@ func (transporter *httpTransporter) closeResponseChannel() {
 func (transporter *httpTransporter) Read() (response, error) {
        resp, ok := <-transporter.responseChannel
        if !ok {
-               return response{}, errors.New("response stream closed")
+               return response{}, ErrResponseStreamClosed
        }
        return resp, nil
 }
diff --git a/gremlin-go/driver/streamingDeserializer.go 
b/gremlin-go/driver/streamingDeserializer.go
new file mode 100644
index 0000000000..949c3e3b00
--- /dev/null
+++ b/gremlin-go/driver/streamingDeserializer.go
@@ -0,0 +1,583 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+package gremlingo
+
+import (
+       "bufio"
+       "encoding/binary"
+       "fmt"
+       "io"
+       "math"
+       "math/big"
+       "reflect"
+       "time"
+
+       "github.com/google/uuid"
+)
+
+// streamingDeserializer reads GraphBinary directly from bufio.Reader
+// without interface overhead for maximum performance
+type streamingDeserializer struct {
+       r   *bufio.Reader
+       buf [8]byte
+       err error // sticky error
+}
+
+func newStreamingDeserializer(r io.Reader) *streamingDeserializer {
+       return &streamingDeserializer{r: bufio.NewReaderSize(r, 8192)}
+}
+
+func (d *streamingDeserializer) readByte() (byte, error) {
+       if d.err != nil {
+               return 0, d.err
+       }
+       b, err := d.r.ReadByte()
+       if err != nil {
+               d.err = err
+               return 0, err
+       }
+       return b, nil
+}
+
+func (d *streamingDeserializer) readBytes(n int) ([]byte, error) {
+       if d.err != nil {
+               return nil, d.err
+       }
+       buf := make([]byte, n)
+       _, err := io.ReadFull(d.r, buf)
+       if err != nil {
+               d.err = err
+               return nil, err
+       }
+       return buf, nil
+}
+
+func (d *streamingDeserializer) readInt32() (int32, error) {
+       if d.err != nil {
+               return 0, d.err
+       }
+       _, err := io.ReadFull(d.r, d.buf[:4])
+       if err != nil {
+               d.err = err
+               return 0, err
+       }
+       return int32(binary.BigEndian.Uint32(d.buf[:4])), nil
+}
+
+func (d *streamingDeserializer) readUint32() (uint32, error) {
+       if d.err != nil {
+               return 0, d.err
+       }
+       _, err := io.ReadFull(d.r, d.buf[:4])
+       if err != nil {
+               d.err = err
+               return 0, err
+       }
+       return binary.BigEndian.Uint32(d.buf[:4]), nil
+}
+
+func (d *streamingDeserializer) readInt64() (int64, error) {
+       if d.err != nil {
+               return 0, d.err
+       }
+       _, err := io.ReadFull(d.r, d.buf[:8])
+       if err != nil {
+               d.err = err
+               return 0, err
+       }
+       return int64(binary.BigEndian.Uint64(d.buf[:8])), nil
+}
+
+func (d *streamingDeserializer) readHeader() error {
+       if _, err := d.readByte(); err != nil {
+               return err
+       }
+       _, err := d.readByte()
+       return err
+}
+
+func (d *streamingDeserializer) readFullyQualified() (interface{}, error) {
+       dtByte, err := d.readByte()
+       if err != nil {
+               return nil, err
+       }
+       dt := dataType(dtByte)
+       if dt == nullType {
+               if _, err := d.readByte(); err != nil {
+                       return nil, err
+               }
+               return nil, nil
+       }
+       flag, err := d.readByte()
+       if err != nil {
+               return nil, err
+       }
+       if flag == valueFlagNull {
+               return nil, nil
+       }
+       return d.readValue(dt, flag)
+}
+
+func (d *streamingDeserializer) readValue(dt dataType, flag byte) 
(interface{}, error) {
+       switch dt {
+       case intType:
+               return d.readInt32()
+       case longType:
+               return d.readInt64()
+       case stringType:
+               return d.readString()
+       case doubleType:
+               if d.err != nil {
+                       return nil, d.err
+               }
+               if _, err := io.ReadFull(d.r, d.buf[:8]); err != nil {
+                       d.err = err
+                       return nil, err
+               }
+               return 
math.Float64frombits(binary.BigEndian.Uint64(d.buf[:8])), nil
+       case floatType:
+               if d.err != nil {
+                       return nil, d.err
+               }
+               if _, err := io.ReadFull(d.r, d.buf[:4]); err != nil {
+                       d.err = err
+                       return nil, err
+               }
+               return 
math.Float32frombits(binary.BigEndian.Uint32(d.buf[:4])), nil
+       case booleanType:
+               b, err := d.readByte()
+               return b != 0, err
+       case byteType:
+               return d.readByte()
+       case shortType:
+               if d.err != nil {
+                       return nil, d.err
+               }
+               if _, err := io.ReadFull(d.r, d.buf[:2]); err != nil {
+                       d.err = err
+                       return nil, err
+               }
+               return int16(binary.BigEndian.Uint16(d.buf[:2])), nil
+       case uuidType:
+               buf, err := d.readBytes(16)
+               if err != nil {
+                       return nil, err
+               }
+               id, err := uuid.FromBytes(buf)
+               if err != nil {
+                       return nil, err
+               }
+               return id, nil
+       case listType:
+               return d.readList(flag == 0x02)
+       case setType:
+               list, err := d.readList(flag == 0x02)
+               if err != nil {
+                       return nil, err
+               }
+               return NewSimpleSet(list.([]interface{})...), nil
+       case mapType:
+               return d.readMap()
+       case vertexType:
+               return d.readVertex(true)
+       case edgeType:
+               return d.readEdge()
+       case pathType:
+               return d.readPath()
+       case propertyType:
+               return d.readProperty()
+       case vertexPropertyType:
+               return d.readVertexProperty()
+       case bigIntegerType:
+               return d.readBigInt()
+       case bigDecimalType:
+               return d.readBigDecimal()
+       case datetimeType:
+               return d.readDateTime()
+       case durationType:
+               return d.readDuration()
+       case markerType:
+               b, err := d.readByte()
+               if err != nil {
+                       return nil, err
+               }
+               return Of(b)
+       case byteBuffer:
+               return d.readByteBuffer()
+       case tType, directionType, mergeType, gTypeType:
+               return d.readEnum()
+       default:
+               return nil, 
newError(err0408GetSerializerToReadUnknownTypeError, dt)
+       }
+}
+
+func (d *streamingDeserializer) readString() (string, error) {
+       length, err := d.readInt32()
+       if err != nil {
+               return "", err
+       }
+       if length == 0 {
+               return "", nil
+       }
+       buf, err := d.readBytes(int(length))
+       if err != nil {
+               return "", err
+       }
+       return string(buf), nil
+}
+
+func (d *streamingDeserializer) readList(bulked bool) (interface{}, error) {
+       length, err := d.readInt32()
+       if err != nil {
+               return nil, err
+       }
+       list := make([]interface{}, 0, length)
+       for i := int32(0); i < length; i++ {
+               val, err := d.readFullyQualified()
+               if err != nil {
+                       return nil, err
+               }
+               if bulked {
+                       bulk, err := d.readInt64()
+                       if err != nil {
+                               return nil, err
+                       }
+                       for j := int64(0); j < bulk; j++ {
+                               list = append(list, val)
+                       }
+               } else {
+                       list = append(list, val)
+               }
+       }
+       return list, nil
+}
+
+func (d *streamingDeserializer) readMap() (interface{}, error) {
+       length, err := d.readUint32()
+       if err != nil {
+               return nil, err
+       }
+       m := make(map[interface{}]interface{}, length)
+       for i := uint32(0); i < length; i++ {
+               key, err := d.readFullyQualified()
+               if err != nil {
+                       return nil, err
+               }
+               val, err := d.readFullyQualified()
+               if err != nil {
+                       return nil, err
+               }
+               if key == nil {
+                       m[nil] = val
+               } else if reflect.TypeOf(key).Comparable() {
+                       m[key] = val
+               } else if reflect.TypeOf(key).Kind() == reflect.Map {
+                       m[&key] = val
+               } else {
+                       m[fmt.Sprint(key)] = val
+               }
+       }
+       return m, nil
+}
+
+func (d *streamingDeserializer) readVertex(withProps bool) (*Vertex, error) {
+       id, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       labels, err := d.readList(false)
+       if err != nil {
+               return nil, err
+       }
+       labelSlice, ok := labels.([]interface{})
+       if !ok || len(labelSlice) == 0 {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       label, ok := labelSlice[0].(string)
+       if !ok {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       v := &Vertex{Element: Element{Id: id, Label: label}}
+       if withProps {
+               props, err := d.readFullyQualified()
+               if err != nil {
+                       return nil, err
+               }
+               v.Properties = make([]interface{}, 0)
+               if props != nil {
+                       v.Properties = props
+               }
+       }
+       return v, nil
+}
+
+func (d *streamingDeserializer) readEdge() (*Edge, error) {
+       id, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       labels, err := d.readList(false)
+       if err != nil {
+               return nil, err
+       }
+       labelSlice, ok := labels.([]interface{})
+       if !ok || len(labelSlice) == 0 {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       label, ok := labelSlice[0].(string)
+       if !ok {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       inV, err := d.readVertex(false)
+       if err != nil {
+               return nil, err
+       }
+       outV, err := d.readVertex(false)
+       if err != nil {
+               return nil, err
+       }
+       if _, err := d.readBytes(2); err != nil {
+               return nil, err
+       }
+       props, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       e := &Edge{
+               Element: Element{Id: id, Label: label},
+               InV:     *inV,
+               OutV:    *outV,
+       }
+       e.Properties = make([]interface{}, 0)
+       if props != nil {
+               e.Properties = props
+       }
+       return e, nil
+}
+
+func (d *streamingDeserializer) readPath() (*Path, error) {
+       labels, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       objects, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       objectSlice, ok := objects.([]interface{})
+       if !ok {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       path := &Path{Objects: objectSlice}
+       if labels != nil {
+               labelSlice, ok := labels.([]interface{})
+               if !ok {
+                       return nil, newError(err0404ReadNullTypeError)
+               }
+               for _, l := range labelSlice {
+                       set, ok := l.(*SimpleSet)
+                       if !ok {
+                               return nil, newError(err0404ReadNullTypeError)
+                       }
+                       path.Labels = append(path.Labels, set)
+               }
+       }
+       return path, nil
+}
+
+func (d *streamingDeserializer) readProperty() (*Property, error) {
+       key, err := d.readString()
+       if err != nil {
+               return nil, err
+       }
+       value, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       if _, err := d.readBytes(2); err != nil {
+               return nil, err
+       }
+       return &Property{Key: key, Value: value}, nil
+}
+
+func (d *streamingDeserializer) readVertexProperty() (*VertexProperty, error) {
+       id, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       labels, err := d.readList(false)
+       if err != nil {
+               return nil, err
+       }
+       labelSlice, ok := labels.([]interface{})
+       if !ok || len(labelSlice) == 0 {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       label, ok := labelSlice[0].(string)
+       if !ok {
+               return nil, newError(err0404ReadNullTypeError)
+       }
+       value, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       if _, err := d.readBytes(2); err != nil {
+               return nil, err
+       }
+       props, err := d.readFullyQualified()
+       if err != nil {
+               return nil, err
+       }
+       vp := &VertexProperty{
+               Element: Element{Id: id, Label: label},
+               Value:   value,
+       }
+       vp.Properties = make([]interface{}, 0)
+       if props != nil {
+               vp.Properties = props
+       }
+       return vp, nil
+}
+
+func (d *streamingDeserializer) readBigInt() (*big.Int, error) {
+       length, err := d.readInt32()
+       if err != nil {
+               return nil, err
+       }
+       if length == 0 {
+               return big.NewInt(0), nil
+       }
+       b, err := d.readBytes(int(length))
+       if err != nil {
+               return nil, err
+       }
+       bi := big.NewInt(0).SetBytes(b)
+       if b[0]&0x80 != 0 {
+               one := big.NewInt(1)
+               bitLen := uint((len(b)*8)/8+1) * 8
+               bi.Sub(bi, new(big.Int).Lsh(one, bitLen))
+       }
+       return bi, nil
+}
+
+func (d *streamingDeserializer) readBigDecimal() (*BigDecimal, error) {
+       scale, err := d.readInt32()
+       if err != nil {
+               return nil, err
+       }
+       unscaled, err := d.readBigInt()
+       if err != nil {
+               return nil, err
+       }
+       return &BigDecimal{Scale: scale, UnscaledValue: unscaled}, nil
+}
+
+func (d *streamingDeserializer) readDateTime() (time.Time, error) {
+       year, err := d.readInt32()
+       if err != nil {
+               return time.Time{}, err
+       }
+       month, err := d.readByte()
+       if err != nil {
+               return time.Time{}, err
+       }
+       day, err := d.readByte()
+       if err != nil {
+               return time.Time{}, err
+       }
+       totalNS, err := d.readInt64()
+       if err != nil {
+               return time.Time{}, err
+       }
+       offset, err := d.readInt32()
+       if err != nil {
+               return time.Time{}, err
+       }
+       ns := totalNS % 1e9
+       totalS := totalNS / 1e9
+       s := totalS % 60
+       totalM := totalS / 60
+       m := totalM % 60
+       h := totalM / 60
+       return time.Date(int(year), time.Month(month), int(day), int(h), 
int(m), int(s), int(ns), GetTimezoneFromOffset(int(offset))), nil
+}
+
+func (d *streamingDeserializer) readDuration() (time.Duration, error) {
+       seconds, err := d.readInt64()
+       if err != nil {
+               return 0, err
+       }
+       nanos, err := d.readInt32()
+       if err != nil {
+               return 0, err
+       }
+       return time.Duration(seconds*int64(time.Second) + int64(nanos)), nil
+}
+
+func (d *streamingDeserializer) readByteBuffer() (*ByteBuffer, error) {
+       length, err := d.readInt32()
+       if err != nil {
+               return nil, err
+       }
+       data, err := d.readBytes(int(length))
+       if err != nil {
+               return nil, err
+       }
+       return &ByteBuffer{Data: data}, nil
+}
+
+func (d *streamingDeserializer) readEnum() (string, error) {
+       if _, err := d.readByte(); err != nil { // type code (string)
+               return "", err
+       }
+       if _, err := d.readByte(); err != nil { // null flag
+               return "", err
+       }
+       return d.readString()
+}
+
+func (d *streamingDeserializer) readStatus() (uint32, string, string, error) {
+       code, err := d.readUint32()
+       if err != nil {
+               return 0, "", "", err
+       }
+       var message, exception string
+       flag, err := d.readByte()
+       if err != nil {
+               return code, "", "", err
+       }
+       if flag != valueFlagNull {
+               message, err = d.readString()
+               if err != nil {
+                       return code, "", "", err
+               }
+       }
+       flag, err = d.readByte()
+       if err != nil {
+               return code, message, "", err
+       }
+       if flag != valueFlagNull {
+               exception, err = d.readString()
+               if err != nil {
+                       return code, message, "", err
+               }
+       }
+       return code, message, exception, nil
+}
diff --git a/gremlin-go/driver/streamingDeserializer_test.go 
b/gremlin-go/driver/streamingDeserializer_test.go
new file mode 100644
index 0000000000..91ad173444
--- /dev/null
+++ b/gremlin-go/driver/streamingDeserializer_test.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+       "bytes"
+       "io"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestStreamingDeserializer(t *testing.T) {
+       t.Run("readInt32", func(t *testing.T) {
+               data := []byte{0x00, 0x00, 0x00, 0x2A} // 42
+               d := newStreamingDeserializer(bytes.NewReader(data))
+               val, err := d.readInt32()
+               assert.Nil(t, err)
+               assert.Equal(t, int32(42), val)
+       })
+
+       t.Run("readInt64", func(t *testing.T) {
+               data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} 
// 100
+               d := newStreamingDeserializer(bytes.NewReader(data))
+               val, err := d.readInt64()
+               assert.Nil(t, err)
+               assert.Equal(t, int64(100), val)
+       })
+
+       t.Run("readString", func(t *testing.T) {
+               data := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o'}
+               d := newStreamingDeserializer(bytes.NewReader(data))
+               val, err := d.readString()
+               assert.Nil(t, err)
+               assert.Equal(t, "hello", val)
+       })
+
+       t.Run("readString empty", func(t *testing.T) {
+               data := []byte{0x00, 0x00, 0x00, 0x00}
+               d := newStreamingDeserializer(bytes.NewReader(data))
+               val, err := d.readString()
+               assert.Nil(t, err)
+               assert.Equal(t, "", val)
+       })
+
+       t.Run("error on incomplete data", func(t *testing.T) {
+               data := []byte{0x00, 0x00} // incomplete int32
+               d := newStreamingDeserializer(bytes.NewReader(data))
+               _, err := d.readInt32()
+               assert.ErrorIs(t, err, io.ErrUnexpectedEOF)
+       })
+
+       t.Run("sticky error", func(t *testing.T) {
+               data := []byte{0x00} // too short
+               d := newStreamingDeserializer(bytes.NewReader(data))
+
+               _, err1 := d.readInt32()
+               assert.Error(t, err1)
+
+               // Subsequent reads should also fail
+               _, err2 := d.readInt32()
+               assert.Error(t, err2)
+       })
+}
+
+func TestStreamingChannelDelivery(t *testing.T) {
+       t.Run("results arrive incrementally via channel", func(t *testing.T) {
+               rs := newChannelResultSet()
+
+               // Simulate streaming - send results with delays
+               go func() {
+                       for i := 0; i < 5; i++ {
+                               time.Sleep(10 * time.Millisecond)
+                               rs.Channel() <- &Result{i}
+                       }
+                       rs.Close()
+               }()
+
+               var times []time.Duration
+               start := time.Now()
+
+               for {
+                       _, ok, _ := rs.One()
+                       if !ok {
+                               break
+                       }
+                       times = append(times, time.Since(start))
+               }
+
+               assert.Equal(t, 5, len(times))
+
+               // Results should arrive ~10ms apart, not all at once
+               for i := 1; i < len(times); i++ {
+                       gap := times[i] - times[i-1]
+                       assert.GreaterOrEqual(t, gap, 5*time.Millisecond,
+                               "Results %d and %d arrived too close together: 
%v", i-1, i, gap)
+               }
+       })
+}

Reply via email to