This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-http-streaming in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit ffbaeaaadb03fc5e5052c4166b8e074ee8a3d87b Author: Yang Xia <[email protected]> AuthorDate: Wed Jan 14 13:27:30 2026 -0800 interim imp --- gremlin-go/driver/client.go | 12 ++- gremlin-go/driver/cucumber/cucumberSteps_test.go | 2 +- gremlin-go/driver/cucumber/cucumberWorld.go | 4 +- gremlin-go/driver/graphBinary.go | 75 ++++++++++---- gremlin-go/driver/httpProtocol.go | 89 +++++++---------- gremlin-go/driver/httpTransporter.go | 121 +++++++++++++++-------- gremlin-go/driver/resultSet.go | 4 +- gremlin-go/driver/serializer.go | 106 ++++++++++++++++++-- 8 files changed, 286 insertions(+), 127 deletions(-) diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index ecacb54c5d..d8777d873d 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -21,6 +21,7 @@ package gremlingo import ( "crypto/tls" + "fmt" "reflect" "runtime" "time" @@ -32,11 +33,12 @@ const keepAliveIntervalDefault = 5 * time.Second const writeDeadlineDefault = 3 * time.Second const connectionTimeoutDefault = 5 * time.Second -// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The default is 1MB. +// ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes. The default is 64KB. // If a buffer size is set zero, then the transporter default size is used. The I/O buffer // sizes do not limit the size of the messages that can be sent or received. -const readBufferSizeDefault = 1048576 -const writeBufferSizeDefault = 1048576 +const readBufferSizeDefault = 65536 // 64KB +const maxReadBufferSize = 1073741824 // 1GB - Go's maximum per Read() call +const writeBufferSizeDefault = 65536 // 64KB // ClientSettings is used to modify a Client's settings on initialization. type ClientSettings struct { @@ -91,6 +93,10 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C configuration(settings) } + if settings.ReadBufferSize > maxReadBufferSize { + return nil, fmt.Errorf("readBufferSize %d exceeds maximum of %d bytes", settings.ReadBufferSize, maxReadBufferSize) + } + connSettings := &connectionSettings{ authInfo: settings.AuthInfo, tlsConfig: settings.TlsConfig, diff --git a/gremlin-go/driver/cucumber/cucumberSteps_test.go b/gremlin-go/driver/cucumber/cucumberSteps_test.go index f9caa7e187..532f950823 100644 --- a/gremlin-go/driver/cucumber/cucumberSteps_test.go +++ b/gremlin-go/driver/cucumber/cucumberSteps_test.go @@ -430,7 +430,7 @@ func (tg *tinkerPopGraph) nothingShouldHappenBecause(arg1 *godog.DocString) erro func (tg *tinkerPopGraph) chooseGraph(graphName string) error { tg.graphName = graphName data := tg.graphDataMap[graphName] - tg.g = gremlingo.Traversal_().With(data.connection) + tg.g = gremlingo.Traversal_().With(data.connection).With("language", "gremlin-lang") if graphName == "empty" { err := tg.cleanEmptyDataGraph(tg.g) if err != nil { diff --git a/gremlin-go/driver/cucumber/cucumberWorld.go b/gremlin-go/driver/cucumber/cucumberWorld.go index ac92ea1db8..dcbec09ef4 100644 --- a/gremlin-go/driver/cucumber/cucumberWorld.go +++ b/gremlin-go/driver/cucumber/cucumberWorld.go @@ -109,7 +109,7 @@ func (t *CucumberWorld) loadAllDataGraph() { if err != nil { panic(fmt.Sprintf("Failed to create connection '%v'", err)) } - g := gremlingo.Traversal_().With(connection) + g := gremlingo.Traversal_().With(connection).With("language", "gremlin-lang") t.graphDataMap[name] = &DataGraph{ name: name, connection: connection, @@ -130,7 +130,7 @@ func (t *CucumberWorld) loadEmptyDataGraph() { func (t *CucumberWorld) reloadEmptyData() { graphData := t.getDataGraphFromMap("empty") - g := gremlingo.Traversal_().With(graphData.connection) + g := gremlingo.Traversal_().With(graphData.connection).With("language", "gremlin-lang") graphData.vertices = getVertices(g) graphData.edges = getEdges(g) } diff --git a/gremlin-go/driver/graphBinary.go b/gremlin-go/driver/graphBinary.go index c553b0a2e2..c28aa633dc 100644 --- a/gremlin-go/driver/graphBinary.go +++ b/gremlin-go/driver/graphBinary.go @@ -22,6 +22,7 @@ package gremlingo import ( "bytes" "encoding/binary" + "errors" "fmt" "math" "math/big" @@ -31,6 +32,8 @@ import ( "github.com/google/uuid" ) +var ErrIncompleteData = errors.New("incomplete data") + // Version 1.0 // dataType graphBinary types. @@ -613,12 +616,12 @@ func (serializer *graphBinaryTypeSerializer) writeValueFlagNone(buffer *bytes.Bu // readers -func readTemp(data *[]byte, i *int, len int) *[]byte { - tmp := make([]byte, len) - for j := 0; j < len; j++ { - tmp[j] = (*data)[j+*i] +func readTemp(data *[]byte, i *int, length int) *[]byte { + if *i+length > len(*data) { + panic(ErrIncompleteData) } - *i += len + tmp := (*data)[*i : *i+length] + *i += length return &tmp } @@ -655,21 +658,30 @@ func readLong(data *[]byte, i *int) (interface{}, error) { } func readBigInt(data *[]byte, i *int) (interface{}, error) { - sz := readIntSafe(data, i) - b := readTemp(data, i, int(sz)) + // Length lookahead - check if complete BigInt is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) + } + *i += 4 + if sz == 0 { + return big.NewInt(0), nil + } + b := (*data)[*i : *i+sz] + *i += sz - var newBigInt = big.NewInt(0).SetBytes(*b) + var newBigInt = big.NewInt(0).SetBytes(b) var one = big.NewInt(1) - if len(*b) == 0 { - return newBigInt, nil - } // If the first bit in the first element of the byte array is a 1, we need to interpret the byte array as a two's complement representation - if (*b)[0]&0x80 == 0x00 { - newBigInt.SetBytes(*b) + if b[0]&0x80 == 0x00 { + newBigInt.SetBytes(b) return newBigInt, nil } // Undo two's complement to byte array and set negative boolean to true - length := uint((len(*b)*8)/8+1) * 8 + length := uint((len(b)*8)/8+1) * 8 b2 := new(big.Int).Sub(newBigInt, new(big.Int).Lsh(one, length)).Bytes() // Strip the resulting 0xff byte at the start of array @@ -709,12 +721,21 @@ func readDouble(data *[]byte, i *int) (interface{}, error) { } func readString(data *[]byte, i *int) (interface{}, error) { - sz := int(readUint32Safe(data, i)) + // Length lookahead - check if complete string is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) // Don't advance index - wait for more data + } + *i += 4 // Now safe to advance past length if sz == 0 { return "", nil } + result := string((*data)[*i : *i+sz]) *i += sz - return string((*data)[*i-sz : *i]), nil + return result, nil } func readDataType(data *[]byte, i *int) dataType { @@ -776,12 +797,19 @@ func readList(data *[]byte, i *int, flag byte) (interface{}, error) { } func readByteBuffer(data *[]byte, i *int) (interface{}, error) { + // Length lookahead - check if complete ByteBuffer is available + if *i+4 > len(*data) { + panic(ErrIncompleteData) + } + sz := int(binary.BigEndian.Uint32((*data)[*i : *i+4])) + if sz > 0 && *i+4+sz > len(*data) { + panic(ErrIncompleteData) + } + *i += 4 r := &ByteBuffer{} - sz := readIntSafe(data, i) r.Data = make([]byte, sz) - for j := int32(0); j < sz; j++ { - r.Data[j] = readByteSafe(data, i) - } + copy(r.Data, (*data)[*i:*i+sz]) + *i += sz return r, nil } @@ -846,7 +874,12 @@ func readSet(data *[]byte, i *int, flag byte) (interface{}, error) { } func readUuid(data *[]byte, i *int) (interface{}, error) { - id, _ := uuid.FromBytes(*readTemp(data, i, 16)) + // Bounds check - UUID is fixed 16 bytes + if *i+16 > len(*data) { + panic(ErrIncompleteData) + } + id, _ := uuid.FromBytes((*data)[*i : *i+16]) + *i += 16 return id, nil } diff --git a/gremlin-go/driver/httpProtocol.go b/gremlin-go/driver/httpProtocol.go index 9c9bba4a91..502bb3ae3f 100644 --- a/gremlin-go/driver/httpProtocol.go +++ b/gremlin-go/driver/httpProtocol.go @@ -20,13 +20,12 @@ under the License. package gremlingo import ( - "fmt" "net/http" ) // responsible for serializing and sending requests and then receiving and deserializing responses type httpProtocol struct { - serializer Serializer + serializer *GraphBinarySerializer logHandler *logHandler url string connSettings *connectionSettings @@ -59,7 +58,6 @@ func newHttpProtocol(handler *logHandler, url string, connSettings *connectionSe // sends a query request and returns a ResultSet that can be used to obtain query results func (protocol *httpProtocol) send(request *request) (ResultSet, error) { rs := newChannelResultSet() - fmt.Println("Serializing request") bytes, err := protocol.serializer.SerializeMessage(request) if err != nil { rs.setError(err) @@ -68,7 +66,7 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { } // one transport per request - transport := NewHttpTransporter(protocol.url, protocol.connSettings, protocol.httpClient, protocol.logHandler) + transport := newHttpTransporter(protocol.url, protocol.connSettings, protocol.httpClient, protocol.logHandler) // async send request transport.wg.Add(1) @@ -76,6 +74,7 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { defer transport.wg.Done() err := transport.Write(bytes) if err != nil { + transport.Close() // Close transport to unblock receiver rs.setError(err) rs.Close() } @@ -85,12 +84,9 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { transport.wg.Add(1) go func() { defer transport.wg.Done() - msg, err := transport.Read() + err := protocol.receiveChunkedResponse(rs, transport) if err != nil { rs.setError(err) - rs.Close() - } else { - err = protocol.receive(rs, msg) } transport.Close() }() @@ -101,55 +97,44 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { return rs, rs.GetError() } -// receives a binary response message, deserializes, and adds results to the ResultSet -func (protocol *httpProtocol) receive(rs ResultSet, msg []byte) error { - fmt.Println("Deserializing response") - resp, err := protocol.serializer.DeserializeMessage(msg) - if err != nil { - protocol.logHandler.logf(Error, logErrorGeneric, "receive()", err.Error()) - rs.Close() - return err - } +// receiveChunkedResponse processes individual chunk responses +func (protocol *httpProtocol) receiveChunkedResponse(rs ResultSet, transport *httpTransporter) error { + for { + resp, err := transport.Read() + if err != nil { + if err.Error() == "response stream closed" { + rs.Close() + return nil + } + rs.Close() + return err + } - fmt.Println("Handling response") - err = protocol.handleResponse(rs, resp) - if err != nil { - protocol.logHandler.logf(Error, logErrorGeneric, "receive()", err.Error()) - rs.Close() - return err - } - return nil -} + endOfStream := false + if data, ok := resp.ResponseResult.Data.([]interface{}); ok { + for _, obj := range data { + if marker, ok := obj.(Marker); ok && marker == EndOfStream() { + endOfStream = true + break + } -// processes a deserialized response and attempts to add results to the ResultSet -func (protocol *httpProtocol) handleResponse(rs ResultSet, response Response) error { - fmt.Println("Handling response") + rs.Channel() <- &Result{obj} + } + } - statusCode, data := response.ResponseStatus.code, response.ResponseResult.Data - if rs == nil { - return newError(err0501ResponseHandlerResultSetNotCreatedError) - } + // Check status code (error status comes after EndOfStream) + if resp.ResponseStatus.code != 0 && resp.ResponseStatus.code != 200 { + rs.Close() + err := newError(err0502ResponseHandlerReadLoopError, resp.ResponseStatus, resp.ResponseStatus.code) + rs.setError(err) + return err + } - if statusCode == http.StatusNoContent { - rs.addResult(&Result{make([]interface{}, 0)}) - rs.Close() - protocol.logHandler.logf(Debug, readComplete) - } else if statusCode == http.StatusOK { - rs.addResult(&Result{data}) - rs.Close() - protocol.logHandler.logf(Debug, readComplete) - } else if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { - rs.Close() - err := newError(err0503ResponseHandlerAuthError, response.ResponseStatus, response.ResponseResult) - rs.setError(err) - return err - } else { - rs.Close() - err := newError(err0502ResponseHandlerReadLoopError, response.ResponseStatus, statusCode) - rs.setError(err) - return err + if endOfStream { + rs.Close() + return nil + } } - return nil } func (protocol *httpProtocol) close() { diff --git a/gremlin-go/driver/httpTransporter.go b/gremlin-go/driver/httpTransporter.go index 94059a19c1..9a9c5bb99b 100644 --- a/gremlin-go/driver/httpTransporter.go +++ b/gremlin-go/driver/httpTransporter.go @@ -23,41 +23,40 @@ import ( "bytes" "compress/zlib" "errors" - "fmt" "io" "net/http" "sync" + "time" ) -// TODO decide channel size when chunked response handling is implemented - for now just set to 1 -const responseChannelSizeDefault = 1 - -// HttpTransporter responsible for sending and receiving bytes to/from the server -type HttpTransporter struct { +// httpTransporter responsible for sending and receiving bytes to/from the server +type httpTransporter struct { url string isClosed bool connSettings *connectionSettings - responseChannel chan []byte // receives response bytes from the server + responseChannel chan Response // receives response messages httpClient *http.Client wg *sync.WaitGroup logHandler *logHandler + closeOnce sync.Once } -func NewHttpTransporter(url string, connSettings *connectionSettings, httpClient *http.Client, logHandler *logHandler) *HttpTransporter { +func newHttpTransporter(url string, connSettings *connectionSettings, httpClient *http.Client, logHandler *logHandler) *httpTransporter { wg := &sync.WaitGroup{} - return &HttpTransporter{ + return &httpTransporter{ url: url, connSettings: connSettings, - responseChannel: make(chan []byte, responseChannelSizeDefault), + responseChannel: make(chan Response, 10), httpClient: httpClient, wg: wg, logHandler: logHandler, + closeOnce: sync.Once{}, } } -// Write sends bytes to the server as a POST request and sends received response bytes to the responseChannel -func (transporter *HttpTransporter) Write(data []byte) error { +// Write sends bytes to the server and starts streaming response processing +func (transporter *httpTransporter) Write(data []byte) error { req, err := http.NewRequest("POST", transporter.url, bytes.NewBuffer(data)) if err != nil { transporter.logHandler.logf(Error, failedToSendRequest, err.Error()) @@ -87,7 +86,6 @@ func (transporter *HttpTransporter) Write(data []byte) error { } } - fmt.Println("Sending request") resp, err := transporter.httpClient.Do(req) if err != nil { transporter.logHandler.logf(Error, failedToSendRequest, err.Error()) @@ -99,47 +97,90 @@ func (transporter *HttpTransporter) Write(data []byte) error { reader, err = zlib.NewReader(resp.Body) if err != nil { transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) + err := resp.Body.Close() + if err != nil { + return err + } return err } } - // TODO handle chunked encoding and send chunks to responseChannel - all, err := io.ReadAll(reader) - if err != nil { - transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) - return err - } - err = reader.Close() - if err != nil { - return err - } + // Start streaming processing in background + go transporter.streamResponse(reader, resp.Body) + return nil +} - // TODO for debug, remove later, and check response handling - //str := hex.EncodeToString(all) - //_, _ = fmt.Fprintf(os.Stdout, "Received response data : %s\n", str) +// streamResponse processes HTTP chunks independently +func (transporter *httpTransporter) streamResponse(reader io.Reader, body io.Closer) { + defer func(body io.Closer) { + err := body.Close() + if err != nil { + } + }(body) + defer transporter.closeResponseChannel() + + serializer := newGraphBinarySerializer(transporter.logHandler) + isFirstChunk := true + + chunk := make([]byte, transporter.connSettings.readBufferSize) + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for { + n, err := reader.Read(chunk) + if n > 0 { + msg, procErr := serializer.readChunk(chunk[:n], isFirstChunk) + if procErr != nil { + transporter.logHandler.logf(Error, failedToReceiveResponse, procErr.Error()) + return + } + isFirstChunk = false - fmt.Println("Sending response to responseChannel") - transporter.responseChannel <- all - return nil + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(5 * time.Second) + + select { + case transporter.responseChannel <- *msg: + case <-timer.C: + transporter.logHandler.logf(Error, failedToReceiveResponse, "timeout") + return + } + } + + if err == io.EOF { + break + } + if err != nil { + transporter.logHandler.logf(Error, failedToReceiveResponse, err.Error()) + return + } + } } -// Read reads bytes from the responseChannel -func (transporter *HttpTransporter) Read() ([]byte, error) { - fmt.Println("Reading from responseChannel") - msg, ok := <-transporter.responseChannel +func (transporter *httpTransporter) closeResponseChannel() { + transporter.closeOnce.Do(func() { + close(transporter.responseChannel) + }) +} + +// Read reads response messages from the stream +func (transporter *httpTransporter) Read() (Response, error) { + resp, ok := <-transporter.responseChannel if !ok { - return []byte{}, errors.New("failed to read from response channel") + return Response{}, errors.New("response stream closed") } - return msg, nil + return resp, nil } // Close closes the transporter and its corresponding responseChannel -func (transporter *HttpTransporter) Close() { - fmt.Println("Closing http transporter") +func (transporter *httpTransporter) Close() { if !transporter.isClosed { - if transporter.responseChannel != nil { - close(transporter.responseChannel) - } + transporter.closeResponseChannel() transporter.isClosed = true } } diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go index 59e22e66d1..cf1230cd53 100644 --- a/gremlin-go/driver/resultSet.go +++ b/gremlin-go/driver/resultSet.go @@ -152,8 +152,8 @@ func (channelResultSet *channelResultSet) All() ([]*Result, error) { func (channelResultSet *channelResultSet) addResult(r *Result) { channelResultSet.channelMutex.Lock() - if r.GetType().Kind() == reflect.Array || r.GetType().Kind() == reflect.Slice { - for _, v := range r.Data.([]interface{}) { + if data, ok := r.Data.([]interface{}); ok { + for _, v := range data { if reflect.TypeOf(v) == reflect.TypeOf(&Traverser{}) { for i := int64(0); i < (v.(*Traverser)).bulk; i++ { channelResultSet.channel <- &Result{(v.(*Traverser)).value} diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go index 68be6862ac..c2d68c6500 100644 --- a/gremlin-go/driver/serializer.go +++ b/gremlin-go/driver/serializer.go @@ -35,7 +35,9 @@ type Serializer interface { // GraphBinarySerializer serializes/deserializes message to/from GraphBinary. type GraphBinarySerializer struct { - ser *graphBinaryTypeSerializer + ser *graphBinaryTypeSerializer + bulked bool // State maintained between chunks + buffer []byte // Buffer for incomplete objects across chunks } // CustomTypeReader user provided function to deserialize custom types @@ -56,9 +58,13 @@ func init() { initDeserializers() } -func newGraphBinarySerializer(handler *logHandler) Serializer { +func newGraphBinarySerializer(handler *logHandler) *GraphBinarySerializer { serializer := graphBinaryTypeSerializer{handler} - return GraphBinarySerializer{&serializer} + return &GraphBinarySerializer{ + ser: &serializer, + bulked: false, + buffer: make([]byte, 0), + } } // TODO change for graph binary 4.0 version is finalized @@ -86,8 +92,8 @@ const versionByte byte = 0x81 // bytes, err := serializer.(graphBinarySerializer).SerializeMessage(&req) // // Send bytes over custom transport // -// serializeMessage serializes a request message into GraphBinary. -func (gs GraphBinarySerializer) SerializeMessage(request *request) ([]byte, error) { +// SerializeMessage serializes a request message into GraphBinary. +func (gs *GraphBinarySerializer) SerializeMessage(request *request) ([]byte, error) { finalMessage, err := gs.buildMessage(request.gremlin, request.fields) if err != nil { return nil, err @@ -132,7 +138,7 @@ func (gs *GraphBinarySerializer) buildMessage(gremlin string, args map[string]in // serializer := newGraphBinarySerializer(nil) // resp, err := serializer.(graphBinarySerializer).DeserializeMessage(responseBytes) // results := resp.responseResult.data -func (gs GraphBinarySerializer) DeserializeMessage(message []byte) (Response, error) { +func (gs *GraphBinarySerializer) DeserializeMessage(message []byte) (Response, error) { var msg Response if message == nil || len(message) == 0 { @@ -181,6 +187,94 @@ func (gs GraphBinarySerializer) DeserializeMessage(message []byte) (Response, er return msg, nil } +// readChunk processes HTTP chunks with simple buffering +func (gs *GraphBinarySerializer) readChunk(chunk []byte, isFirstChunk bool) (*Response, error) { + var msg Response + + if len(chunk) == 0 { + msg.ResponseStatus.code = 204 + return &msg, nil + } + + // Append to buffer + gs.buffer = append(gs.buffer, chunk...) + + i := 0 + + if isFirstChunk { + if len(gs.buffer) < 2 { + msg.ResponseResult.Data = make([]interface{}, 0) + return &msg, nil + } + i++ // skip version + gs.bulked = (gs.buffer[i] & 1) == 1 + i++ + } + + results := make([]interface{}, 0) + startPos := i + processedPos := i // Track position after last successfully processed object + endOfStream := false + + // Process complete objects, recover from panic on incomplete data + func() { + defer func() { + if r := recover(); r != nil { + if r == ErrIncompleteData { + i = startPos // reset to start of incomplete object + } else { + panic(r) // re-panic for other errors + } + } + }() + for i < len(gs.buffer) { + // Peek before parsing - need at least 2 bytes for type + nullable + if len(gs.buffer)-i < 2 { + break + } + startPos = i + obj, err := readFullyQualifiedNullable(&gs.buffer, &i, true) + if err != nil { + return + } + + if marker, ok := obj.(Marker); ok && marker == EndOfStream() { + gs.bulked = false + // Read status after end of stream + msg.ResponseStatus.code = readUint32Safe(&gs.buffer, &i) + if statusMsg, _ := readUnqualified(&gs.buffer, &i, stringType, true); statusMsg != nil { + msg.ResponseStatus.message = statusMsg.(string) + } + if exception, _ := readUnqualified(&gs.buffer, &i, stringType, true); exception != nil { + msg.ResponseStatus.exception = exception.(string) + } + // Only set endOfStream after successfully reading status + endOfStream = true + gs.buffer = gs.buffer[:0] + return + } + + results = append(results, obj) + processedPos = i // Update after successful processing + } + }() + + msg.ResponseResult.Data = results + + // Keep unprocessed data in buffer (unless we hit end of stream) + if !endOfStream { + if processedPos < len(gs.buffer) { + remaining := make([]byte, len(gs.buffer)-processedPos) + copy(remaining, gs.buffer[processedPos:]) + gs.buffer = remaining + } else { + gs.buffer = gs.buffer[:0] + } + } + + return &msg, nil +} + func initSerializers() { serializers = map[dataType]writer{ stringType: stringWriter,
