This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch go-http-streaming
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit f6612bfdf39290c287607462bd33363f4a1f7385
Author: Yang Xia <[email protected]>
AuthorDate: Wed Jan 14 13:27:30 2026 -0800

    interim imp
---
 gremlin-go/driver/client.go                      |  12 ++-
 gremlin-go/driver/cucumber/cucumberSteps_test.go |   2 +-
 gremlin-go/driver/cucumber/cucumberWorld.go      |   4 +-
 gremlin-go/driver/graphBinary.go                 |  75 ++++++++++----
 gremlin-go/driver/httpProtocol.go                |  89 +++++++----------
 gremlin-go/driver/httpTransporter.go             | 121 +++++++++++++++--------
 gremlin-go/driver/resultSet.go                   |   4 +-
 gremlin-go/driver/serializer.go                  | 106 ++++++++++++++++++--
 8 files changed, 286 insertions(+), 127 deletions(-)

diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go
index 12994f62f6..472c484150 100644
--- a/gremlin-go/driver/client.go
+++ b/gremlin-go/driver/client.go
@@ -21,6 +21,7 @@ package gremlingo
 
 import (
        "crypto/tls"
+       "fmt"
        "reflect"
        "runtime"
        "time"
@@ -32,11 +33,12 @@ const keepAliveIntervalDefault = 5 * time.Second
 const writeDeadlineDefault = 3 * time.Second
 const connectionTimeoutDefault = 5 * time.Second
 
-// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The 
default is 1MB.
+// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The 
default is 64KB.
 // If a buffer size is set zero, then the transporter default size is used. 
The I/O buffer
 // sizes do not limit the size of the messages that can be sent or received.
-const readBufferSizeDefault = 1048576
-const writeBufferSizeDefault = 1048576
+const readBufferSizeDefault = 65536  // 64KB
+const maxReadBufferSize = 1073741824 // 1GB - Go's maximum per Read() call
+const writeBufferSizeDefault = 65536 // 64KB
 
 // ClientSettings is used to modify a Client's settings on initialization.
 type ClientSettings struct {
@@ -91,6 +93,10 @@ func NewClient(url string, configurations ...func(settings 
*ClientSettings)) (*C
                configuration(settings)
        }
 
+       if settings.ReadBufferSize > maxReadBufferSize {
+               return nil, fmt.Errorf("readBufferSize %d exceeds maximum of %d 
bytes", settings.ReadBufferSize, maxReadBufferSize)
+       }
+
        connSettings := &connectionSettings{
                authInfo:                 settings.AuthInfo,
                tlsConfig:                settings.TlsConfig,
diff --git a/gremlin-go/driver/cucumber/cucumberSteps_test.go 
b/gremlin-go/driver/cucumber/cucumberSteps_test.go
index f9caa7e187..532f950823 100644
--- a/gremlin-go/driver/cucumber/cucumberSteps_test.go
+++ b/gremlin-go/driver/cucumber/cucumberSteps_test.go
@@ -430,7 +430,7 @@ func (tg *tinkerPopGraph) nothingShouldHappenBecause(arg1 
*godog.DocString) erro
 func (tg *tinkerPopGraph) chooseGraph(graphName string) error {
        tg.graphName = graphName
        data := tg.graphDataMap[graphName]
-       tg.g = gremlingo.Traversal_().With(data.connection)
+       tg.g = gremlingo.Traversal_().With(data.connection).With("language", 
"gremlin-lang")
        if graphName == "empty" {
                err := tg.cleanEmptyDataGraph(tg.g)
                if err != nil {
diff --git a/gremlin-go/driver/cucumber/cucumberWorld.go 
b/gremlin-go/driver/cucumber/cucumberWorld.go
index ac92ea1db8..dcbec09ef4 100644
--- a/gremlin-go/driver/cucumber/cucumberWorld.go
+++ b/gremlin-go/driver/cucumber/cucumberWorld.go
@@ -109,7 +109,7 @@ func (t *CucumberWorld) loadAllDataGraph() {
                        if err != nil {
                                panic(fmt.Sprintf("Failed to create connection 
'%v'", err))
                        }
-                       g := gremlingo.Traversal_().With(connection)
+                       g := 
gremlingo.Traversal_().With(connection).With("language", "gremlin-lang")
                        t.graphDataMap[name] = &DataGraph{
                                name:             name,
                                connection:       connection,
@@ -130,7 +130,7 @@ func (t *CucumberWorld) loadEmptyDataGraph() {
 
 func (t *CucumberWorld) reloadEmptyData() {
        graphData := t.getDataGraphFromMap("empty")
-       g := gremlingo.Traversal_().With(graphData.connection)
+       g := gremlingo.Traversal_().With(graphData.connection).With("language", 
"gremlin-lang")
        graphData.vertices = getVertices(g)
        graphData.edges = getEdges(g)
 }
diff --git a/gremlin-go/driver/graphBinary.go b/gremlin-go/driver/graphBinary.go
index c553b0a2e2..c28aa633dc 100644
--- a/gremlin-go/driver/graphBinary.go
+++ b/gremlin-go/driver/graphBinary.go
@@ -22,6 +22,7 @@ package gremlingo
 import (
        "bytes"
        "encoding/binary"
+       "errors"
        "fmt"
        "math"
        "math/big"
@@ -31,6 +32,8 @@ import (
        "github.com/google/uuid"
 )
 
+var ErrIncompleteData = errors.New("incomplete data")
+
 // Version 1.0
 
 // dataType graphBinary types.
@@ -613,12 +616,12 @@ func (serializer *graphBinaryTypeSerializer) 
writeValueFlagNone(buffer *bytes.Bu
 
 // readers
 
-func readTemp(data *[]byte, i *int, len int) *[]byte {
-       tmp := make([]byte, len)
-       for j := 0; j < len; j++ {
-               tmp[j] = (*data)[j+*i]
+func readTemp(data *[]byte, i *int, length int) *[]byte {
+       if *i+length > len(*data) {
+               panic(ErrIncompleteData)
        }
-       *i += len
+       tmp := (*data)[*i : *i+length]
+       *i += length
        return &tmp
 }
 
@@ -655,21 +658,30 @@ func readLong(data *[]byte, i *int) (interface{}, error) {
 }
 
 func readBigInt(data *[]byte, i *int) (interface{}, error) {
-       sz := readIntSafe(data, i)
-       b := readTemp(data, i, int(sz))
+       // Length lookahead - check if complete BigInt is available
+       if *i+4 > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4]))
+       if sz > 0 && *i+4+sz > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       *i += 4
+       if sz == 0 {
+               return big.NewInt(0), nil
+       }
+       b := (*data)[*i : *i+sz]
+       *i += sz
 
-       var newBigInt = big.NewInt(0).SetBytes(*b)
+       var newBigInt = big.NewInt(0).SetBytes(b)
        var one = big.NewInt(1)
-       if len(*b) == 0 {
-               return newBigInt, nil
-       }
        // If the first bit in the first element of the byte array is a 1, we 
need to interpret the byte array as a two's complement representation
-       if (*b)[0]&0x80 == 0x00 {
-               newBigInt.SetBytes(*b)
+       if b[0]&0x80 == 0x00 {
+               newBigInt.SetBytes(b)
                return newBigInt, nil
        }
        // Undo two's complement to byte array and set negative boolean to true
-       length := uint((len(*b)*8)/8+1) * 8
+       length := uint((len(b)*8)/8+1) * 8
        b2 := new(big.Int).Sub(newBigInt, new(big.Int).Lsh(one, length)).Bytes()
 
        // Strip the resulting 0xff byte at the start of array
@@ -709,12 +721,21 @@ func readDouble(data *[]byte, i *int) (interface{}, 
error) {
 }
 
 func readString(data *[]byte, i *int) (interface{}, error) {
-       sz := int(readUint32Safe(data, i))
+       // Length lookahead - check if complete string is available
+       if *i+4 > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4]))
+       if sz > 0 && *i+4+sz > len(*data) {
+               panic(ErrIncompleteData) // Don't advance index - wait for more 
data
+       }
+       *i += 4 // Now safe to advance past length
        if sz == 0 {
                return "", nil
        }
+       result := string((*data)[*i : *i+sz])
        *i += sz
-       return string((*data)[*i-sz : *i]), nil
+       return result, nil
 }
 
 func readDataType(data *[]byte, i *int) dataType {
@@ -776,12 +797,19 @@ func readList(data *[]byte, i *int, flag byte) 
(interface{}, error) {
 }
 
 func readByteBuffer(data *[]byte, i *int) (interface{}, error) {
+       // Length lookahead - check if complete ByteBuffer is available
+       if *i+4 > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4]))
+       if sz > 0 && *i+4+sz > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       *i += 4
        r := &ByteBuffer{}
-       sz := readIntSafe(data, i)
        r.Data = make([]byte, sz)
-       for j := int32(0); j < sz; j++ {
-               r.Data[j] = readByteSafe(data, i)
-       }
+       copy(r.Data, (*data)[*i:*i+sz])
+       *i += sz
        return r, nil
 }
 
@@ -846,7 +874,12 @@ func readSet(data *[]byte, i *int, flag byte) 
(interface{}, error) {
 }
 
 func readUuid(data *[]byte, i *int) (interface{}, error) {
-       id, _ := uuid.FromBytes(*readTemp(data, i, 16))
+       // Bounds check - UUID is fixed 16 bytes
+       if *i+16 > len(*data) {
+               panic(ErrIncompleteData)
+       }
+       id, _ := uuid.FromBytes((*data)[*i : *i+16])
+       *i += 16
        return id, nil
 }
 
diff --git a/gremlin-go/driver/httpProtocol.go 
b/gremlin-go/driver/httpProtocol.go
index 9c9bba4a91..502bb3ae3f 100644
--- a/gremlin-go/driver/httpProtocol.go
+++ b/gremlin-go/driver/httpProtocol.go
@@ -20,13 +20,12 @@ under the License.
 package gremlingo
 
 import (
-       "fmt"
        "net/http"
 )
 
 // responsible for serializing and sending requests and then receiving and 
deserializing responses
 type httpProtocol struct {
-       serializer   Serializer
+       serializer   *GraphBinarySerializer
        logHandler   *logHandler
        url          string
        connSettings *connectionSettings
@@ -59,7 +58,6 @@ func newHttpProtocol(handler *logHandler, url string, 
connSettings *connectionSe
 // sends a query request and returns a ResultSet that can be used to obtain 
query results
 func (protocol *httpProtocol) send(request *request) (ResultSet, error) {
        rs := newChannelResultSet()
-       fmt.Println("Serializing request")
        bytes, err := protocol.serializer.SerializeMessage(request)
        if err != nil {
                rs.setError(err)
@@ -68,7 +66,7 @@ func (protocol *httpProtocol) send(request *request) 
(ResultSet, error) {
        }
 
        // one transport per request
-       transport := NewHttpTransporter(protocol.url, protocol.connSettings, 
protocol.httpClient, protocol.logHandler)
+       transport := newHttpTransporter(protocol.url, protocol.connSettings, 
protocol.httpClient, protocol.logHandler)
 
        // async send request
        transport.wg.Add(1)
@@ -76,6 +74,7 @@ func (protocol *httpProtocol) send(request *request) 
(ResultSet, error) {
                defer transport.wg.Done()
                err := transport.Write(bytes)
                if err != nil {
+                       transport.Close() // Close transport to unblock receiver
                        rs.setError(err)
                        rs.Close()
                }
@@ -85,12 +84,9 @@ func (protocol *httpProtocol) send(request *request) 
(ResultSet, error) {
        transport.wg.Add(1)
        go func() {
                defer transport.wg.Done()
-               msg, err := transport.Read()
+               err := protocol.receiveChunkedResponse(rs, transport)
                if err != nil {
                        rs.setError(err)
-                       rs.Close()
-               } else {
-                       err = protocol.receive(rs, msg)
                }
                transport.Close()
        }()
@@ -101,55 +97,44 @@ func (protocol *httpProtocol) send(request *request) 
(ResultSet, error) {
        return rs, rs.GetError()
 }
 
-// receives a binary response message, deserializes, and adds results to the 
ResultSet
-func (protocol *httpProtocol) receive(rs ResultSet, msg []byte) error {
-       fmt.Println("Deserializing response")
-       resp, err := protocol.serializer.DeserializeMessage(msg)
-       if err != nil {
-               protocol.logHandler.logf(Error, logErrorGeneric, "receive()", 
err.Error())
-               rs.Close()
-               return err
-       }
+// receiveChunkedResponse processes individual chunk responses
+func (protocol *httpProtocol) receiveChunkedResponse(rs ResultSet, transport 
*httpTransporter) error {
+       for {
+               resp, err := transport.Read()
+               if err != nil {
+                       if err.Error() == "response stream closed" {
+                               rs.Close()
+                               return nil
+                       }
+                       rs.Close()
+                       return err
+               }
 
-       fmt.Println("Handling response")
-       err = protocol.handleResponse(rs, resp)
-       if err != nil {
-               protocol.logHandler.logf(Error, logErrorGeneric, "receive()", 
err.Error())
-               rs.Close()
-               return err
-       }
-       return nil
-}
+               endOfStream := false
+               if data, ok := resp.ResponseResult.Data.([]interface{}); ok {
+                       for _, obj := range data {
+                               if marker, ok := obj.(Marker); ok && marker == 
EndOfStream() {
+                                       endOfStream = true
+                                       break
+                               }
 
-// processes a deserialized response and attempts to add results to the 
ResultSet
-func (protocol *httpProtocol) handleResponse(rs ResultSet, response Response) 
error {
-       fmt.Println("Handling response")
+                               rs.Channel() <- &Result{obj}
+                       }
+               }
 
-       statusCode, data := response.ResponseStatus.code, 
response.ResponseResult.Data
-       if rs == nil {
-               return newError(err0501ResponseHandlerResultSetNotCreatedError)
-       }
+               // Check status code (error status comes after EndOfStream)
+               if resp.ResponseStatus.code != 0 && resp.ResponseStatus.code != 
200 {
+                       rs.Close()
+                       err := newError(err0502ResponseHandlerReadLoopError, 
resp.ResponseStatus, resp.ResponseStatus.code)
+                       rs.setError(err)
+                       return err
+               }
 
-       if statusCode == http.StatusNoContent {
-               rs.addResult(&Result{make([]interface{}, 0)})
-               rs.Close()
-               protocol.logHandler.logf(Debug, readComplete)
-       } else if statusCode == http.StatusOK {
-               rs.addResult(&Result{data})
-               rs.Close()
-               protocol.logHandler.logf(Debug, readComplete)
-       } else if statusCode == http.StatusUnauthorized || statusCode == 
http.StatusForbidden {
-               rs.Close()
-               err := newError(err0503ResponseHandlerAuthError, 
response.ResponseStatus, response.ResponseResult)
-               rs.setError(err)
-               return err
-       } else {
-               rs.Close()
-               err := newError(err0502ResponseHandlerReadLoopError, 
response.ResponseStatus, statusCode)
-               rs.setError(err)
-               return err
+               if endOfStream {
+                       rs.Close()
+                       return nil
+               }
        }
-       return nil
 }
 
 func (protocol *httpProtocol) close() {
diff --git a/gremlin-go/driver/httpTransporter.go 
b/gremlin-go/driver/httpTransporter.go
index 94059a19c1..9a9c5bb99b 100644
--- a/gremlin-go/driver/httpTransporter.go
+++ b/gremlin-go/driver/httpTransporter.go
@@ -23,41 +23,40 @@ import (
        "bytes"
        "compress/zlib"
        "errors"
-       "fmt"
        "io"
        "net/http"
        "sync"
+       "time"
 )
 
-// TODO decide channel size when chunked response handling is implemented - 
for now just set to 1
-const responseChannelSizeDefault = 1
-
-// HttpTransporter responsible for sending and receiving bytes to/from the 
server
-type HttpTransporter struct {
+// httpTransporter responsible for sending and receiving bytes to/from the 
server
+type httpTransporter struct {
        url             string
        isClosed        bool
        connSettings    *connectionSettings
-       responseChannel chan []byte // receives response bytes from the server
+       responseChannel chan Response // receives response messages
        httpClient      *http.Client
        wg              *sync.WaitGroup
        logHandler      *logHandler
+       closeOnce       sync.Once
 }
 
-func NewHttpTransporter(url string, connSettings *connectionSettings, 
httpClient *http.Client, logHandler *logHandler) *HttpTransporter {
+func newHttpTransporter(url string, connSettings *connectionSettings, 
httpClient *http.Client, logHandler *logHandler) *httpTransporter {
        wg := &sync.WaitGroup{}
 
-       return &HttpTransporter{
+       return &httpTransporter{
                url:             url,
                connSettings:    connSettings,
-               responseChannel: make(chan []byte, responseChannelSizeDefault),
+               responseChannel: make(chan Response, 10),
                httpClient:      httpClient,
                wg:              wg,
                logHandler:      logHandler,
+               closeOnce:       sync.Once{},
        }
 }
 
-// Write sends bytes to the server as a POST request and sends received 
response bytes to the responseChannel
-func (transporter *HttpTransporter) Write(data []byte) error {
+// Write sends bytes to the server and starts streaming response processing
+func (transporter *httpTransporter) Write(data []byte) error {
        req, err := http.NewRequest("POST", transporter.url, 
bytes.NewBuffer(data))
        if err != nil {
                transporter.logHandler.logf(Error, failedToSendRequest, 
err.Error())
@@ -87,7 +86,6 @@ func (transporter *HttpTransporter) Write(data []byte) error {
                }
        }
 
-       fmt.Println("Sending request")
        resp, err := transporter.httpClient.Do(req)
        if err != nil {
                transporter.logHandler.logf(Error, failedToSendRequest, 
err.Error())
@@ -99,47 +97,90 @@ func (transporter *HttpTransporter) Write(data []byte) 
error {
                reader, err = zlib.NewReader(resp.Body)
                if err != nil {
                        transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
+                       err := resp.Body.Close()
+                       if err != nil {
+                               return err
+                       }
                        return err
                }
        }
 
-       // TODO handle chunked encoding and send chunks to responseChannel
-       all, err := io.ReadAll(reader)
-       if err != nil {
-               transporter.logHandler.logf(Error, failedToReceiveResponse, 
err.Error())
-               return err
-       }
-       err = reader.Close()
-       if err != nil {
-               return err
-       }
+       // Start streaming processing in background
+       go transporter.streamResponse(reader, resp.Body)
+       return nil
+}
 
-       // TODO for debug, remove later, and check response handling
-       //str := hex.EncodeToString(all)
-       //_, _ = fmt.Fprintf(os.Stdout, "Received response data : %s\n", str)
+// streamResponse processes HTTP chunks independently
+func (transporter *httpTransporter) streamResponse(reader io.Reader, body 
io.Closer) {
+       defer func(body io.Closer) {
+               err := body.Close()
+               if err != nil {
+               }
+       }(body)
+       defer transporter.closeResponseChannel()
+
+       serializer := newGraphBinarySerializer(transporter.logHandler)
+       isFirstChunk := true
+
+       chunk := make([]byte, transporter.connSettings.readBufferSize)
+       timer := time.NewTimer(5 * time.Second)
+       defer timer.Stop()
+
+       for {
+               n, err := reader.Read(chunk)
+               if n > 0 {
+                       msg, procErr := serializer.readChunk(chunk[:n], 
isFirstChunk)
+                       if procErr != nil {
+                               transporter.logHandler.logf(Error, 
failedToReceiveResponse, procErr.Error())
+                               return
+                       }
+                       isFirstChunk = false
 
-       fmt.Println("Sending response to responseChannel")
-       transporter.responseChannel <- all
-       return nil
+                       if !timer.Stop() {
+                               select {
+                               case <-timer.C:
+                               default:
+                               }
+                       }
+                       timer.Reset(5 * time.Second)
+
+                       select {
+                       case transporter.responseChannel <- *msg:
+                       case <-timer.C:
+                               transporter.logHandler.logf(Error, 
failedToReceiveResponse, "timeout")
+                               return
+                       }
+               }
+
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       transporter.logHandler.logf(Error, 
failedToReceiveResponse, err.Error())
+                       return
+               }
+       }
 }
 
-// Read reads bytes from the responseChannel
-func (transporter *HttpTransporter) Read() ([]byte, error) {
-       fmt.Println("Reading from responseChannel")
-       msg, ok := <-transporter.responseChannel
+func (transporter *httpTransporter) closeResponseChannel() {
+       transporter.closeOnce.Do(func() {
+               close(transporter.responseChannel)
+       })
+}
+
+// Read reads response messages from the stream
+func (transporter *httpTransporter) Read() (Response, error) {
+       resp, ok := <-transporter.responseChannel
        if !ok {
-               return []byte{}, errors.New("failed to read from response 
channel")
+               return Response{}, errors.New("response stream closed")
        }
-       return msg, nil
+       return resp, nil
 }
 
 // Close closes the transporter and its corresponding responseChannel
-func (transporter *HttpTransporter) Close() {
-       fmt.Println("Closing http transporter")
+func (transporter *httpTransporter) Close() {
        if !transporter.isClosed {
-               if transporter.responseChannel != nil {
-                       close(transporter.responseChannel)
-               }
+               transporter.closeResponseChannel()
                transporter.isClosed = true
        }
 }
diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go
index 59e22e66d1..cf1230cd53 100644
--- a/gremlin-go/driver/resultSet.go
+++ b/gremlin-go/driver/resultSet.go
@@ -152,8 +152,8 @@ func (channelResultSet *channelResultSet) All() ([]*Result, 
error) {
 
 func (channelResultSet *channelResultSet) addResult(r *Result) {
        channelResultSet.channelMutex.Lock()
-       if r.GetType().Kind() == reflect.Array || r.GetType().Kind() == 
reflect.Slice {
-               for _, v := range r.Data.([]interface{}) {
+       if data, ok := r.Data.([]interface{}); ok {
+               for _, v := range data {
                        if reflect.TypeOf(v) == reflect.TypeOf(&Traverser{}) {
                                for i := int64(0); i < (v.(*Traverser)).bulk; 
i++ {
                                        channelResultSet.channel <- 
&Result{(v.(*Traverser)).value}
diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go
index 68be6862ac..c2d68c6500 100644
--- a/gremlin-go/driver/serializer.go
+++ b/gremlin-go/driver/serializer.go
@@ -35,7 +35,9 @@ type Serializer interface {
 
 // GraphBinarySerializer serializes/deserializes message to/from GraphBinary.
 type GraphBinarySerializer struct {
-       ser *graphBinaryTypeSerializer
+       ser    *graphBinaryTypeSerializer
+       bulked bool   // State maintained between chunks
+       buffer []byte // Buffer for incomplete objects across chunks
 }
 
 // CustomTypeReader user provided function to deserialize custom types
@@ -56,9 +58,13 @@ func init() {
        initDeserializers()
 }
 
-func newGraphBinarySerializer(handler *logHandler) Serializer {
+func newGraphBinarySerializer(handler *logHandler) *GraphBinarySerializer {
        serializer := graphBinaryTypeSerializer{handler}
-       return GraphBinarySerializer{&serializer}
+       return &GraphBinarySerializer{
+               ser:    &serializer,
+               bulked: false,
+               buffer: make([]byte, 0),
+       }
 }
 
 // TODO change for graph binary 4.0 version is finalized
@@ -86,8 +92,8 @@ const versionByte byte = 0x81
 //     bytes, err := serializer.(graphBinarySerializer).SerializeMessage(&req)
 //     // Send bytes over custom transport
 //
-// serializeMessage serializes a request message into GraphBinary.
-func (gs GraphBinarySerializer) SerializeMessage(request *request) ([]byte, 
error) {
+// SerializeMessage serializes a request message into GraphBinary.
+func (gs *GraphBinarySerializer) SerializeMessage(request *request) ([]byte, 
error) {
        finalMessage, err := gs.buildMessage(request.gremlin, request.fields)
        if err != nil {
                return nil, err
@@ -132,7 +138,7 @@ func (gs *GraphBinarySerializer) buildMessage(gremlin 
string, args map[string]in
 //     serializer := newGraphBinarySerializer(nil)
 //     resp, err := 
serializer.(graphBinarySerializer).DeserializeMessage(responseBytes)
 //     results := resp.responseResult.data
-func (gs GraphBinarySerializer) DeserializeMessage(message []byte) (Response, 
error) {
+func (gs *GraphBinarySerializer) DeserializeMessage(message []byte) (Response, 
error) {
        var msg Response
 
        if message == nil || len(message) == 0 {
@@ -181,6 +187,94 @@ func (gs GraphBinarySerializer) DeserializeMessage(message 
[]byte) (Response, er
        return msg, nil
 }
 
+// readChunk processes HTTP chunks with simple buffering
+func (gs *GraphBinarySerializer) readChunk(chunk []byte, isFirstChunk bool) 
(*Response, error) {
+       var msg Response
+
+       if len(chunk) == 0 {
+               msg.ResponseStatus.code = 204
+               return &msg, nil
+       }
+
+       // Append to buffer
+       gs.buffer = append(gs.buffer, chunk...)
+
+       i := 0
+
+       if isFirstChunk {
+               if len(gs.buffer) < 2 {
+                       msg.ResponseResult.Data = make([]interface{}, 0)
+                       return &msg, nil
+               }
+               i++ // skip version
+               gs.bulked = (gs.buffer[i] & 1) == 1
+               i++
+       }
+
+       results := make([]interface{}, 0)
+       startPos := i
+       processedPos := i // Track position after last successfully processed 
object
+       endOfStream := false
+
+       // Process complete objects, recover from panic on incomplete data
+       func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               if r == ErrIncompleteData {
+                                       i = startPos // reset to start of 
incomplete object
+                               } else {
+                                       panic(r) // re-panic for other errors
+                               }
+                       }
+               }()
+               for i < len(gs.buffer) {
+                       // Peek before parsing - need at least 2 bytes for type 
+ nullable
+                       if len(gs.buffer)-i < 2 {
+                               break
+                       }
+                       startPos = i
+                       obj, err := readFullyQualifiedNullable(&gs.buffer, &i, 
true)
+                       if err != nil {
+                               return
+                       }
+
+                       if marker, ok := obj.(Marker); ok && marker == 
EndOfStream() {
+                               gs.bulked = false
+                               // Read status after end of stream
+                               msg.ResponseStatus.code = 
readUint32Safe(&gs.buffer, &i)
+                               if statusMsg, _ := readUnqualified(&gs.buffer, 
&i, stringType, true); statusMsg != nil {
+                                       msg.ResponseStatus.message = 
statusMsg.(string)
+                               }
+                               if exception, _ := readUnqualified(&gs.buffer, 
&i, stringType, true); exception != nil {
+                                       msg.ResponseStatus.exception = 
exception.(string)
+                               }
+                               // Only set endOfStream after successfully 
reading status
+                               endOfStream = true
+                               gs.buffer = gs.buffer[:0]
+                               return
+                       }
+
+                       results = append(results, obj)
+                       processedPos = i // Update after successful processing
+               }
+       }()
+
+       msg.ResponseResult.Data = results
+
+       // Keep unprocessed data in buffer (unless we hit end of stream)
+       if !endOfStream {
+               if processedPos < len(gs.buffer) {
+                       remaining := make([]byte, len(gs.buffer)-processedPos)
+                       copy(remaining, gs.buffer[processedPos:])
+                       gs.buffer = remaining
+               } else {
+                       gs.buffer = gs.buffer[:0]
+               }
+       }
+
+       return &msg, nil
+}
+
 func initSerializers() {
        serializers = map[dataType]writer{
                stringType:     stringWriter,

Reply via email to