This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-http-streaming in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 514b05a198bd5ee44a4dc3bd48a47188723d73bd Author: Yang Xia <[email protected]> AuthorDate: Wed Jan 14 13:27:30 2026 -0800 interim imp --- gremlin-go/driver/client.go | 12 ++- gremlin-go/driver/connection_test.go | 27 ++--- gremlin-go/driver/cucumber/cucumberSteps_test.go | 2 +- gremlin-go/driver/cucumber/cucumberWorld.go | 4 +- gremlin-go/driver/graphBinary.go | 75 ++++++++++---- gremlin-go/driver/httpProtocol.go | 89 +++++++---------- gremlin-go/driver/httpTransporter.go | 121 +++++++++++++++-------- gremlin-go/driver/resultSet.go | 4 +- gremlin-go/driver/serializer.go | 105 +++++++++++++++++++- 9 files changed, 302 insertions(+), 137 deletions(-) diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index 7ce9a8839a..8a2425538a 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -21,6 +21,7 @@ package gremlingo import ( "crypto/tls" + "fmt" "reflect" "runtime" "time" @@ -32,11 +33,12 @@ const keepAliveIntervalDefault = 5 * time.Second const writeDeadlineDefault = 3 * time.Second const connectionTimeoutDefault = 5 * time.Second -// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The default is 1MB. +// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The default is 64KB. // If a buffer size is set zero, then the transporter default size is used. The I/O buffer // sizes do not limit the size of the messages that can be sent or received. -const readBufferSizeDefault = 1048576 -const writeBufferSizeDefault = 1048576 +const readBufferSizeDefault = 65536 // 64KB +const maxReadBufferSize = 1073741824 // 1GB - Go's maximum per Read() call +const writeBufferSizeDefault = 65536 // 64KB // ClientSettings is used to modify a Client's settings on initialization. type ClientSettings struct { @@ -91,6 +93,10 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C configuration(settings) } + if settings.ReadBufferSize > maxReadBufferSize { + return nil, fmt.Errorf("readBufferSize %d exceeds maximum of %d bytes", settings.ReadBufferSize, maxReadBufferSize) + } + connSettings := &connectionSettings{ authInfo: settings.AuthInfo, tlsConfig: settings.TlsConfig, diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index 6638fc4c0f..153f0b0c74 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -310,17 +310,6 @@ func TestConnection(t *testing.T) { }(i) } wg.Wait() - - // - //g := cloneGraphTraversalSource(&Graph{}, NewGremlinLang(nil), nil) - //b := g.V().Count().Bytecode - //resultSet, err = client.submitBytecode(b) - //assert.Nil(t, err) - //assert.NotNil(t, resultSet) - //result, ok, err = resultSet.One() - //assert.Nil(t, err) - //assert.True(t, ok) - //assert.NotNil(t, result) }) t.Run("Test client.submit() with concurrency", func(t *testing.T) { @@ -466,6 +455,22 @@ func TestConnection(t *testing.T) { resetGraph(t, g) }) + t.Run("Test DriverRemoteConnection test", func(t *testing.T) { + remote, err := NewDriverRemoteConnection(testNoAuthUrl, + func(settings *DriverRemoteConnectionSettings) { + settings.TlsConfig = testNoAuthWithAliasTlsConfig + settings.AuthInfo = testNoAuthWithAliasAuthInfo + settings.TraversalSource = "ggrateful" + }) + assert.Nil(t, err) + assert.NotNil(t, remote) + g := Traversal_().With(remote) + defer g.remoteConnection.Close() + results, err := g.V().Has("song", "name", "OH BOY").Out("followedBy").Out("followedBy").Order().By("performances").By("songType", Order.Desc).By("name").ToList() + assert.Nil(t, err) + fmt.Println(len(results)) + }) + t.Run("Test DriverRemoteConnection Next and HasNext", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) diff --git a/gremlin-go/driver/cucumber/cucumberSteps_test.go b/gremlin-go/driver/cucumber/cucumberSteps_test.go index f9caa7e187..532f950823 100644 --- a/gremlin-go/driver/cucumber/cucumberSteps_test.go +++ b/gremlin-go/driver/cucumber/cucumberSteps_test.go @@ -430,7 +430,7 @@ func (tg *tinkerPopGraph) nothingShouldHappenBecause(arg1 *godog.DocString) erro func (tg *tinkerPopGraph) chooseGraph(graphName string) error { tg.graphName = graphName data := tg.graphDataMap[graphName] - tg.g = gremlingo.Traversal_().With(data.connection) + tg.g = gremlingo.Traversal_().With(data.connection).With("language", "gremlin-lang") if graphName == "empty" { err := tg.cleanEmptyDataGraph(tg.g) if err != nil { diff --git a/gremlin-go/driver/cucumber/cucumberWorld.go b/gremlin-go/driver/cucumber/cucumberWorld.go index ac92ea1db8..dcbec09ef4 100644 --- a/gremlin-go/driver/cucumber/cucumberWorld.go +++ b/gremlin-go/driver/cucumber/cucumberWorld.go @@ -109,7 +109,7 @@ func (t *CucumberWorld) loadAllDataGraph() { if err != nil { panic(fmt.Sprintf("Failed to create connection '%v'", err)) } - g := gremlingo.Traversal_().With(connection) + g := gremlingo.Traversal_().With(connection).With("language", "gremlin-lang") t.graphDataMap[name] = &DataGraph{ name: name, connection: connection, @@ -130,7 +130,7 @@ func (t *CucumberWorld) loadEmptyDataGraph() { func (t *CucumberWorld) reloadEmptyData() { graphData := t.getDataGraphFromMap("empty") - g := gremlingo.Traversal_().With(graphData.connection) + g := gremlingo.Traversal_().With(graphData.connection).With("language", "gremlin-lang") graphData.vertices = getVertices(g) graphData.edges = getEdges(g) } diff --git a/gremlin-go/driver/graphBinary.go b/gremlin-go/driver/graphBinary.go index c553b0a2e2..c28aa633dc 100644 --- a/gremlin-go/driver/graphBinary.go +++ b/gremlin-go/driver/graphBinary.go @@ -22,6 +22,7 @@ package gremlingo import ( "bytes" "encoding/binary" + "errors" "fmt" "math" "math/big" @@ -31,6 +32,8 @@ import ( "github.com/google/uuid" ) +var ErrIncompleteData = errors.New("incomplete data") + // Version 1.0 // dataType graphBinary types. @@ -613,12 +616,12 @@ func (serializer *graphBinaryTypeSerializer) writeValueFlagNone(buffer *bytes.Bu // readers -func readTemp(data *[]byte, i *int, len int) *[]byte { - tmp := make([]byte, len) - for j := 0; j < len; j++ { - tmp[j] = (*data)[j+*i] +func readTemp(data *[]byte, i *int, length int) *[]byte { + if *i+length > len(*data) { + panic(ErrIncompleteData) } - *i += len + tmp := (*data)[*i : *i+length] + *i += length return &tmp } @@ -655,21 +658,30 @@ func readLong(data *[]byte, i *int) (interface{}, error) { } func readBigInt(data *[]byte, i *int) (interface{}, error) { - sz := readIntSafe(data, i) - b := readTemp(data, i, int(sz)) + // Length lookahead - check if complete BigInt is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) + } + *i += 4 + if sz == 0 { + return big.NewInt(0), nil + } + b := (*data)[*i : *i+sz] + *i += sz - var newBigInt = big.NewInt(0).SetBytes(*b) + var newBigInt = big.NewInt(0).SetBytes(b) var one = big.NewInt(1) - if len(*b) == 0 { - return newBigInt, nil - } // If the first bit in the first element of the byte array is a 1, we need to interpret the byte array as a two's complement representation - if (*b)[0]&0x80 == 0x00 { - newBigInt.SetBytes(*b) + if b[0]&0x80 == 0x00 { + newBigInt.SetBytes(b) return newBigInt, nil } // Undo two's complement to byte array and set negative boolean to true - length := uint((len(*b)*8)/8+1) * 8 + length := uint((len(b)*8)/8+1) * 8 b2 := new(big.Int).Sub(newBigInt, new(big.Int).Lsh(one, length)).Bytes() // Strip the resulting 0xff byte at the start of array @@ -709,12 +721,21 @@ func readDouble(data *[]byte, i *int) (interface{}, error) { } func readString(data *[]byte, i *int) (interface{}, error) { - sz := int(readUint32Safe(data, i)) + // Length lookahead - check if complete string is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) // Don't advance index - wait for more data + } + *i += 4 // Now safe to advance past length if sz == 0 { return "", nil } + result := string((*data)[*i : *i+sz]) *i += sz - return string((*data)[*i-sz : *i]), nil + return result, nil } func readDataType(data *[]byte, i *int) dataType { @@ -776,12 +797,19 @@ func readList(data *[]byte, i *int, flag byte) (interface{}, error) { } func readByteBuffer(data *[]byte, i *int) (interface{}, error) { + // Length lookahead - check if complete ByteBuffer is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) + } + *i += 4 r := &ByteBuffer{} - sz := readIntSafe(data, i) r.Data = make([]byte, sz) - for j := int32(0); j < sz; j++ { - r.Data[j] = readByteSafe(data, i) - } + copy(r.Data, (*data)[*i:*i+sz]) + *i += sz return r, nil } @@ -846,7 +874,12 @@ func readSet(data *[]byte, i *int, flag byte) (interface{}, error) { } func readUuid(data *[]byte, i *int) (interface{}, error) { - id, _ := uuid.FromBytes(*readTemp(data, i, 16)) + // Bounds check - UUID is fixed 16 bytes + if *i+16 > len(*data) { + panic(ErrIncompleteData) + } + id, _ := uuid.FromBytes((*data)[*i : *i+16]) + *i += 16 return id, nil } diff --git a/gremlin-go/driver/httpProtocol.go b/gremlin-go/driver/httpProtocol.go index 1462b6e794..cbf54f9dcf 100644 --- a/gremlin-go/driver/httpProtocol.go +++ b/gremlin-go/driver/httpProtocol.go @@ -20,13 +20,12 @@ under the License. package gremlingo import ( - "fmt" "net/http" ) // responsible for serializing and sending requests and then receiving and deserializing responses type httpProtocol struct { - serializer serializer + serializer *graphBinarySerializer logHandler *logHandler url string connSettings *connectionSettings @@ -59,7 +58,6 @@ func newHttpProtocol(handler *logHandler, url string, connSettings *connectionSe // sends a query request and returns a ResultSet that can be used to obtain query results func (protocol *httpProtocol) send(request *request) (ResultSet, error) { rs := newChannelResultSet() - fmt.Println("Serializing request") bytes, err := protocol.serializer.serializeMessage(request) if err != nil { rs.setError(err) @@ -68,7 +66,7 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { } // one transport per request - transport := NewHttpTransporter(protocol.url, protocol.connSettings, protocol.httpClient, protocol.logHandler) + transport := newHttpTransporter(protocol.url, protocol.connSettings, protocol.httpClient, protocol.logHandler) // async send request transport.wg.Add(1) @@ -76,6 +74,7 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { defer transport.wg.Done() err := transport.Write(bytes) if err != nil { + transport.Close() // Close transport to unblock receiver rs.setError(err) rs.Close() } @@ -85,12 +84,9 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { transport.wg.Add(1) go func() { defer transport.wg.Done() - msg, err := transport.Read() + err := protocol.receiveChunkedResponse(rs, transport) if err != nil { rs.setError(err) - rs.Close() - } else { - err = protocol.receive(rs, msg) } transport.Close() }() @@ -101,55 +97,44 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { return rs, rs.GetError() } -// receives a binary response message, deserializes, and adds results to the ResultSet -func (protocol *httpProtocol) receive(rs ResultSet, msg []byte) error { - fmt.Println("Deserializing response") - resp, err := protocol.serializer.deserializeMessage(msg) - if err != nil { - protocol.logHandler.logf(Error, logErrorGeneric, "receive()", err.Error()) - rs.Close() - return err - } +// receiveChunkedResponse processes individual chunk responses +func (protocol *httpProtocol) receiveChunkedResponse(rs ResultSet, transport *httpTransporter) error { + for { + resp, err := transport.Read() + if err != nil { + if err.Error() == "response stream closed" { + rs.Close() + return nil + } + rs.Close() + return err + } - fmt.Println("Handling response") - err = protocol.handleResponse(rs, resp) - if err != nil { - protocol.logHandler.logf(Error, logErrorGeneric, "receive()", err.Error()) - rs.Close() - return err - } - return nil -} + endOfStream := false + if data, ok := resp.responseResult.data.([]interface{}); ok { + for _, obj := range data { + if marker, ok := obj.(Marker); ok && marker == EndOfStream() { + endOfStream = true + break + } -// processes a deserialized response and attempts to add results to the ResultSet -func (protocol *httpProtocol) handleResponse(rs ResultSet, response response) error { - fmt.Println("Handling response") + rs.Channel() <- &Result{obj} + } + } - statusCode, data := response.responseStatus.code, response.responseResult.data - if rs == nil { - return newError(err0501ResponseHandlerResultSetNotCreatedError) - } + // Check status code (error status comes after EndOfStream) + if resp.responseStatus.code != 0 && resp.responseStatus.code != 200 { + rs.Close() + err := newError(err0502ResponseHandlerReadLoopError, resp.responseStatus, resp.responseStatus.code) + rs.setError(err) + return err + } - if statusCode == http.StatusNoContent { - rs.addResult(&Result{make([]interface{}, 0)}) - rs.Close() - protocol.logHandler.logf(Debug, readComplete) - } else if statusCode == http.StatusOK { - rs.addResult(&Result{data}) - rs.Close() - protocol.logHandler.logf(Debug, readComplete) - } else if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { - rs.Close() - err := newError(err0503ResponseHandlerAuthError, response.responseStatus, response.responseResult) - rs.setError(err) - return err - } else { - rs.Close() - err := newError(err0502ResponseHandlerReadLoopError, response.responseStatus, statusCode) - rs.setError(err) - return err + if endOfStream { + rs.Close() + return nil + } } - return nil } func (protocol *httpProtocol) close() { diff --git a/gremlin-go/driver/httpTransporter.go b/gremlin-go/driver/httpTransporter.go index 94059a19c1..c5c42ea15e 100644 --- a/gremlin-go/driver/httpTransporter.go +++ b/gremlin-go/driver/httpTransporter.go @@ -23,41 +23,40 @@ import ( "bytes" "compress/zlib" "errors" - "fmt" "io" "net/http" "sync" + "time" ) -// TODO decide channel size when chunked response handling is implemented - for now just set to 1 -const responseChannelSizeDefault = 1 - -// HttpTransporter responsible for sending and receiving bytes to/from the server -type HttpTransporter struct { +// httpTransporter responsible for sending and receiving bytes to/from the server +type httpTransporter struct { url string isClosed bool connSettings *connectionSettings - responseChannel chan []byte // receives response bytes from the server + responseChannel chan response // receives response messages httpClient *http.Client wg *sync.WaitGroup logHandler *logHandler + closeOnce sync.Once } -func NewHttpTransporter(url string, connSettings *connectionSettings, httpClient *http.Client, logHandler *logHandler) *HttpTransporter { +func newHttpTransporter(url string, connSettings *connectionSettings, httpClient *http.Client, logHandler *logHandler) *httpTransporter { wg := &sync.WaitGroup{} - return &HttpTransporter{ + return &httpTransporter{ url: url, connSettings: connSettings, - responseChannel: make(chan []byte, responseChannelSizeDefault), + responseChannel: make(chan response, 10), httpClient: httpClient, wg: wg, logHandler: logHandler, + closeOnce: sync.Once{}, } } -// Write sends bytes to the server as a POST request and sends received response bytes to the responseChannel -func (transporter *HttpTransporter) Write(data []byte) error { +// Write sends bytes to the server and starts streaming response processing +func (transporter *httpTransporter) Write(data []byte) error { req, err := http.NewRequest("POST", transporter.url, bytes.NewBuffer(data)) if err != nil { transporter.logHandler.logf(Error, failedToSendRequest, err.Error()) @@ -87,7 +86,6 @@ func (transporter *HttpTransporter) Write(data []byte) error { } } - fmt.Println("Sending request") resp, err := transporter.httpClient.Do(req) if err != nil { transporter.logHandler.logf(Error, failedToSendRequest, err.Error()) @@ -99,47 +97,90 @@ func (transporter *HttpTransporter) Write(data []byte) error { reader, err = zlib.NewReader(resp.Body) if err != nil { transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) + err := resp.Body.Close() + if err != nil { + return err + } return err } } - // TODO handle chunked encoding and send chunks to responseChannel - all, err := io.ReadAll(reader) - if err != nil { - transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) - return err - } - err = reader.Close() - if err != nil { - return err - } + // Start streaming processing in background + go transporter.streamResponse(reader, resp.Body) + return nil +} - // TODO for debug, remove later, and check response handling - //str := hex.EncodeToString(all) - //_, _ = fmt.Fprintf(os.Stdout, "Received response data : %s\n", str) +// streamResponse processes HTTP chunks independently +func (transporter *httpTransporter) streamResponse(reader io.Reader, body io.Closer) { + defer func(body io.Closer) { + err := body.Close() + if err != nil { + } + }(body) + defer transporter.closeResponseChannel() + + serializer := newGraphBinarySerializer(transporter.logHandler) + isFirstChunk := true + + chunk := make([]byte, transporter.connSettings.readBufferSize) + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for { + n, err := reader.Read(chunk) + if n > 0 { + msg, procErr := serializer.readChunk(chunk[:n], isFirstChunk) + if procErr != nil { + transporter.logHandler.logf(Error, failedToReceiveResponse, procErr.Error()) + return + } + isFirstChunk = false - fmt.Println("Sending response to responseChannel") - transporter.responseChannel <- all - return nil + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(5 * time.Second) + + select { + case transporter.responseChannel <- *msg: + case <-timer.C: + transporter.logHandler.logf(Error, failedToReceiveResponse, "timeout") + return + } + } + + if err == io.EOF { + break + } + if err != nil { + transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) + return + } + } } -// Read reads bytes from the responseChannel -func (transporter *HttpTransporter) Read() ([]byte, error) { - fmt.Println("Reading from responseChannel") - msg, ok := <-transporter.responseChannel +func (transporter *httpTransporter) closeResponseChannel() { + transporter.closeOnce.Do(func() { + close(transporter.responseChannel) + }) +} + +// Read reads response messages from the stream +func (transporter *httpTransporter) Read() (response, error) { + resp, ok := <-transporter.responseChannel if !ok { - return []byte{}, errors.New("failed to read from response channel") + return response{}, errors.New("response stream closed") } - return msg, nil + return resp, nil } // Close closes the transporter and its corresponding responseChannel -func (transporter *HttpTransporter) Close() { - fmt.Println("Closing http transporter") +func (transporter *httpTransporter) Close() { if !transporter.isClosed { - if transporter.responseChannel != nil { - close(transporter.responseChannel) - } + transporter.closeResponseChannel() transporter.isClosed = true } } diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go index 8079744bd3..df79a13892 100644 --- a/gremlin-go/driver/resultSet.go +++ b/gremlin-go/driver/resultSet.go @@ -152,8 +152,8 @@ func (channelResultSet *channelResultSet) All() ([]*Result, error) { func (channelResultSet *channelResultSet) addResult(r *Result) { channelResultSet.channelMutex.Lock() - if r.GetType().Kind() == reflect.Array || r.GetType().Kind() == reflect.Slice { - for _, v := range r.Data.([]interface{}) { + if data, ok := r.Data.([]interface{}); ok { + for _, v := range data { if reflect.TypeOf(v) == reflect.TypeOf(&Traverser{}) { for i := int64(0); i < (v.(*Traverser)).bulk; i++ { channelResultSet.channel <- &Result{(v.(*Traverser)).value} diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go index 1e1c7b2bf3..afebafb2e4 100644 --- a/gremlin-go/driver/serializer.go +++ b/gremlin-go/driver/serializer.go @@ -35,7 +35,9 @@ type serializer interface { // graphBinarySerializer serializes/deserializes message to/from GraphBinary. type graphBinarySerializer struct { - ser *graphBinaryTypeSerializer + ser *graphBinaryTypeSerializer + bulked bool // State maintained between chunks + buffer []byte // Buffer for incomplete objects across chunks } // CustomTypeReader user provided function to deserialize custom types @@ -56,16 +58,20 @@ func init() { initDeserializers() } -func newGraphBinarySerializer(handler *logHandler) serializer { +func newGraphBinarySerializer(handler *logHandler) *graphBinarySerializer { serializer := graphBinaryTypeSerializer{handler} - return graphBinarySerializer{&serializer} + return &graphBinarySerializer{ + ser: &serializer, + bulked: false, + buffer: make([]byte, 0), + } } // TODO change for graph binary 4.0 version is finalized const versionByte byte = 0x81 // serializeMessage serializes a request message into GraphBinary. -func (gs graphBinarySerializer) serializeMessage(request *request) ([]byte, error) { +func (gs *graphBinarySerializer) serializeMessage(request *request) ([]byte, error) { finalMessage, err := gs.buildMessage(request.gremlin, request.fields) if err != nil { return nil, err @@ -92,7 +98,7 @@ func (gs *graphBinarySerializer) buildMessage(gremlin string, args map[string]in } // deserializeMessage deserializes a response message. -func (gs graphBinarySerializer) deserializeMessage(message []byte) (response, error) { +func (gs *graphBinarySerializer) deserializeMessage(message []byte) (response, error) { var msg response if message == nil || len(message) == 0 { @@ -141,6 +147,95 @@ func (gs graphBinarySerializer) deserializeMessage(message []byte) (response, er return msg, nil } +// readChunk processes HTTP chunks with simple buffering +func (gs *graphBinarySerializer) readChunk(chunk []byte, isFirstChunk bool) (*response, error) { + var msg response + + if len(chunk) == 0 { + msg.responseStatus.code = 204 + msg.responseResult.data = make([]interface{}, 0) + return &msg, nil + } + + // Append to buffer + gs.buffer = append(gs.buffer, chunk...) + + i := 0 + + if isFirstChunk { + if len(gs.buffer) < 2 { + msg.responseResult.data = make([]interface{}, 0) + return &msg, nil + } + i++ // skip version + gs.bulked = (gs.buffer[i] & 1) == 1 + i++ + } + + results := make([]interface{}, 0) + startPos := i + processedPos := i // Track position after last successfully processed object + endOfStream := false + + // Process complete objects, recover from panic on incomplete data + func() { + defer func() { + if r := recover(); r != nil { + if r == ErrIncompleteData { + i = startPos // reset to start of incomplete object + } else { + panic(r) // re-panic for other errors + } + } + }() + for i < len(gs.buffer) { + // Peek before parsing - need at least 2 bytes for type + nullable + if len(gs.buffer)-i < 2 { + break + } + startPos = i + obj, err := readFullyQualifiedNullable(&gs.buffer, &i, true) + if err != nil { + return + } + + if marker, ok := obj.(Marker); ok && marker == EndOfStream() { + gs.bulked = false + // Read status after end of stream + msg.responseStatus.code = readUint32Safe(&gs.buffer, &i) + if statusMsg, _ := readUnqualified(&gs.buffer, &i, stringType, true); statusMsg != nil { + msg.responseStatus.message = statusMsg.(string) + } + if exception, _ := readUnqualified(&gs.buffer, &i, stringType, true); exception != nil { + msg.responseStatus.exception = exception.(string) + } + // Only set endOfStream after successfully reading status + endOfStream = true + gs.buffer = gs.buffer[:0] + return + } + + results = append(results, obj) + processedPos = i // Update after successful processing + } + }() + + msg.responseResult.data = results + + // Keep unprocessed data in buffer (unless we hit end of stream) + if !endOfStream { + if processedPos < len(gs.buffer) { + remaining := make([]byte, len(gs.buffer)-processedPos) + copy(remaining, gs.buffer[processedPos:]) + gs.buffer = remaining + } else { + gs.buffer = gs.buffer[:0] + } + } + + return &msg, nil +} + func initSerializers() { serializers = map[dataType]writer{ stringType: stringWriter,
