This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-http-streaming in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 4da7623ee2263dab80705d8153ad489675110c95 Author: Yang Xia <[email protected]> AuthorDate: Thu Jan 15 09:42:31 2026 -0800 trying streaming deser --- gremlin-go/driver/connection_test.go | 61 +++ gremlin-go/driver/httpProtocol.go | 22 +- gremlin-go/driver/streamingDeserializer.go | 583 ++++++++++++++++++++++++ gremlin-go/driver/streamingDeserializer_test.go | 117 +++++ 4 files changed, 768 insertions(+), 15 deletions(-) diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index d27e65d84c..b6ee029432 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -29,6 +29,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -760,3 +761,63 @@ func submitCount(i int, client *Client, t *testing.T) { assert.Equal(t, 6+i, c) _, _ = fmt.Fprintf(os.Stdout, "Received result : %s\n", result) } + +func TestStreamingResultDelivery(t *testing.T) { + testNoAuthWithAliasEnable := getEnvOrDefaultBool("RUN_INTEGRATION_WITH_ALIAS_TESTS", true) + skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthWithAliasEnable) + remote, err := NewDriverRemoteConnection(getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl), + func(settings *DriverRemoteConnectionSettings) { + settings.TlsConfig = &tls.Config{} + settings.AuthInfo = &AuthInfo{} + settings.TraversalSource = "ggrateful" + }) + assert.Nil(t, err) + assert.NotNil(t, remote) + g := Traversal_().With(remote) + defer g.remoteConnection.Close() + + t.Run("first result arrives before all results", func(t *testing.T) { + start := time.Now() + rs, err := g.V().Properties().GetResultSet() + assert.Nil(t, err) + + // First result should arrive quickly + _, ok, err := rs.One() + firstResultTime := time.Since(start) + assert.Nil(t, err) + assert.True(t, ok) + + // Drain remaining + _, err = rs.All() + assert.Nil(t, err) + totalTime := time.Since(start) + + t.Logf("First result: %v, Total: %v, Ratio: %.2f%%", + firstResultTime, totalTime, float64(firstResultTime)/float64(totalTime)*100) + }) + + t.Run("results arrive incrementally", func(t *testing.T) { + rs, err := g.V().Properties().GetResultSet() + assert.Nil(t, err) + + var timestamps []time.Time + start := time.Now() + + for { + _, ok, err := rs.One() + assert.Nil(t, err) + if !ok { + break + } + timestamps = append(timestamps, time.Now()) + } + + if len(timestamps) < 2 { + t.Skip("need more results to test incremental delivery") + } + + firstHalf := timestamps[len(timestamps)/2].Sub(start) + total := timestamps[len(timestamps)-1].Sub(start) + t.Logf("Half results at: %v, All results at: %v", firstHalf, total) + }) +} diff --git a/gremlin-go/driver/httpProtocol.go b/gremlin-go/driver/httpProtocol.go index 502bb3ae3f..18bf762923 100644 --- a/gremlin-go/driver/httpProtocol.go +++ b/gremlin-go/driver/httpProtocol.go @@ -20,6 +20,7 @@ under the License. package gremlingo import ( + "errors" "net/http" ) @@ -68,33 +69,24 @@ func (protocol *httpProtocol) send(request *request) (ResultSet, error) { // one transport per request transport := newHttpTransporter(protocol.url, protocol.connSettings, protocol.httpClient, protocol.logHandler) - // async send request - transport.wg.Add(1) + // async send request and receive response go func() { - defer transport.wg.Done() err := transport.Write(bytes) if err != nil { - transport.Close() // Close transport to unblock receiver + transport.Close() rs.setError(err) rs.Close() + return } - }() - // async receive response - transport.wg.Add(1) - go func() { - defer transport.wg.Done() - err := protocol.receiveChunkedResponse(rs, transport) + err = protocol.receiveChunkedResponse(rs, transport) if err != nil { rs.setError(err) } transport.Close() }() - // Wait for both async operations to complete - transport.wg.Wait() - - return rs, rs.GetError() + return rs, nil } // receiveChunkedResponse processes individual chunk responses @@ -102,7 +94,7 @@ func (protocol *httpProtocol) receiveChunkedResponse(rs ResultSet, transport *ht for { resp, err := transport.Read() if err != nil { - if err.Error() == "response stream closed" { + if errors.Is(err, ErrResponseStreamClosed) { rs.Close() return nil } diff --git a/gremlin-go/driver/streamingDeserializer.go b/gremlin-go/driver/streamingDeserializer.go new file mode 100644 index 0000000000..949c3e3b00 --- /dev/null +++ b/gremlin-go/driver/streamingDeserializer.go @@ -0,0 +1,583 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +package gremlingo + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "math" + "math/big" + "reflect" + "time" + + "github.com/google/uuid" +) + +// streamingDeserializer reads GraphBinary directly from bufio.Reader +// without interface overhead for maximum performance +type streamingDeserializer struct { + r *bufio.Reader + buf [8]byte + err error // sticky error +} + +func newStreamingDeserializer(r io.Reader) *streamingDeserializer { + return &streamingDeserializer{r: bufio.NewReaderSize(r, 8192)} +} + +func (d *streamingDeserializer) readByte() (byte, error) { + if d.err != nil { + return 0, d.err + } + b, err := d.r.ReadByte() + if err != nil { + d.err = err + return 0, err + } + return b, nil +} + +func (d *streamingDeserializer) readBytes(n int) ([]byte, error) { + if d.err != nil { + return nil, d.err + } + buf := make([]byte, n) + _, err := io.ReadFull(d.r, buf) + if err != nil { + d.err = err + return nil, err + } + return buf, nil +} + +func (d *streamingDeserializer) readInt32() (int32, error) { + if d.err != nil { + return 0, d.err + } + _, err := io.ReadFull(d.r, d.buf[:4]) + if err != nil { + d.err = err + return 0, err + } + return int32(binary.BigEndian.Uint32(d.buf[:4])), nil +} + +func (d *streamingDeserializer) readUint32() (uint32, error) { + if d.err != nil { + return 0, d.err + } + _, err := io.ReadFull(d.r, d.buf[:4]) + if err != nil { + d.err = err + return 0, err + } + return binary.BigEndian.Uint32(d.buf[:4]), nil +} + +func (d *streamingDeserializer) readInt64() (int64, error) { + if d.err != nil { + return 0, d.err + } + _, err := io.ReadFull(d.r, d.buf[:8]) + if err != nil { + d.err = err + return 0, err + } + return int64(binary.BigEndian.Uint64(d.buf[:8])), nil +} + +func (d *streamingDeserializer) readHeader() error { + if _, err := d.readByte(); err != nil { + return err + } + _, err := d.readByte() + return err +} + +func (d *streamingDeserializer) readFullyQualified() (interface{}, error) { + dtByte, err := d.readByte() + if err != nil { + return nil, err + } + dt := dataType(dtByte) + if dt == nullType { + if _, err := d.readByte(); err != nil { + return nil, err + } + return nil, nil + } + flag, err := d.readByte() + if err != nil { + return nil, err + } + if flag == valueFlagNull { + return nil, nil + } + return d.readValue(dt, flag) +} + +func (d *streamingDeserializer) readValue(dt dataType, flag byte) (interface{}, error) { + switch dt { + case intType: + return d.readInt32() + case longType: + return d.readInt64() + case stringType: + return d.readString() + case doubleType: + if d.err != nil { + return nil, d.err + } + if _, err := io.ReadFull(d.r, d.buf[:8]); err != nil { + d.err = err + return nil, err + } + return math.Float64frombits(binary.BigEndian.Uint64(d.buf[:8])), nil + case floatType: + if d.err != nil { + return nil, d.err + } + if _, err := io.ReadFull(d.r, d.buf[:4]); err != nil { + d.err = err + return nil, err + } + return math.Float32frombits(binary.BigEndian.Uint32(d.buf[:4])), nil + case booleanType: + b, err := d.readByte() + return b != 0, err + case byteType: + return d.readByte() + case shortType: + if d.err != nil { + return nil, d.err + } + if _, err := io.ReadFull(d.r, d.buf[:2]); err != nil { + d.err = err + return nil, err + } + return int16(binary.BigEndian.Uint16(d.buf[:2])), nil + case uuidType: + buf, err := d.readBytes(16) + if err != nil { + return nil, err + } + id, err := uuid.FromBytes(buf) + if err != nil { + return nil, err + } + return id, nil + case listType: + return d.readList(flag == 0x02) + case setType: + list, err := d.readList(flag == 0x02) + if err != nil { + return nil, err + } + return NewSimpleSet(list.([]interface{})...), nil + case mapType: + return d.readMap() + case vertexType: + return d.readVertex(true) + case edgeType: + return d.readEdge() + case pathType: + return d.readPath() + case propertyType: + return d.readProperty() + case vertexPropertyType: + return d.readVertexProperty() + case bigIntegerType: + return d.readBigInt() + case bigDecimalType: + return d.readBigDecimal() + case datetimeType: + return d.readDateTime() + case durationType: + return d.readDuration() + case markerType: + b, err := d.readByte() + if err != nil { + return nil, err + } + return Of(b) + case byteBuffer: + return d.readByteBuffer() + case tType, directionType, mergeType, gTypeType: + return d.readEnum() + default: + return nil, newError(err0408GetSerializerToReadUnknownTypeError, dt) + } +} + +func (d *streamingDeserializer) readString() (string, error) { + length, err := d.readInt32() + if err != nil { + return "", err + } + if length == 0 { + return "", nil + } + buf, err := d.readBytes(int(length)) + if err != nil { + return "", err + } + return string(buf), nil +} + +func (d *streamingDeserializer) readList(bulked bool) (interface{}, error) { + length, err := d.readInt32() + if err != nil { + return nil, err + } + list := make([]interface{}, 0, length) + for i := int32(0); i < length; i++ { + val, err := d.readFullyQualified() + if err != nil { + return nil, err + } + if bulked { + bulk, err := d.readInt64() + if err != nil { + return nil, err + } + for j := int64(0); j < bulk; j++ { + list = append(list, val) + } + } else { + list = append(list, val) + } + } + return list, nil +} + +func (d *streamingDeserializer) readMap() (interface{}, error) { + length, err := d.readUint32() + if err != nil { + return nil, err + } + m := make(map[interface{}]interface{}, length) + for i := uint32(0); i < length; i++ { + key, err := d.readFullyQualified() + if err != nil { + return nil, err + } + val, err := d.readFullyQualified() + if err != nil { + return nil, err + } + if key == nil { + m[nil] = val + } else if reflect.TypeOf(key).Comparable() { + m[key] = val + } else if reflect.TypeOf(key).Kind() == reflect.Map { + m[&key] = val + } else { + m[fmt.Sprint(key)] = val + } + } + return m, nil +} + +func (d *streamingDeserializer) readVertex(withProps bool) (*Vertex, error) { + id, err := d.readFullyQualified() + if err != nil { + return nil, err + } + labels, err := d.readList(false) + if err != nil { + return nil, err + } + labelSlice, ok := labels.([]interface{}) + if !ok || len(labelSlice) == 0 { + return nil, newError(err0404ReadNullTypeError) + } + label, ok := labelSlice[0].(string) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + v := &Vertex{Element: Element{Id: id, Label: label}} + if withProps { + props, err := d.readFullyQualified() + if err != nil { + return nil, err + } + v.Properties = make([]interface{}, 0) + if props != nil { + v.Properties = props + } + } + return v, nil +} + +func (d *streamingDeserializer) readEdge() (*Edge, error) { + id, err := d.readFullyQualified() + if err != nil { + return nil, err + } + labels, err := d.readList(false) + if err != nil { + return nil, err + } + labelSlice, ok := labels.([]interface{}) + if !ok || len(labelSlice) == 0 { + return nil, newError(err0404ReadNullTypeError) + } + label, ok := labelSlice[0].(string) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + inV, err := d.readVertex(false) + if err != nil { + return nil, err + } + outV, err := d.readVertex(false) + if err != nil { + return nil, err + } + if _, err := d.readBytes(2); err != nil { + return nil, err + } + props, err := d.readFullyQualified() + if err != nil { + return nil, err + } + e := &Edge{ + Element: Element{Id: id, Label: label}, + InV: *inV, + OutV: *outV, + } + e.Properties = make([]interface{}, 0) + if props != nil { + e.Properties = props + } + return e, nil +} + +func (d *streamingDeserializer) readPath() (*Path, error) { + labels, err := d.readFullyQualified() + if err != nil { + return nil, err + } + objects, err := d.readFullyQualified() + if err != nil { + return nil, err + } + objectSlice, ok := objects.([]interface{}) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + path := &Path{Objects: objectSlice} + if labels != nil { + labelSlice, ok := labels.([]interface{}) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + for _, l := range labelSlice { + set, ok := l.(*SimpleSet) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + path.Labels = append(path.Labels, set) + } + } + return path, nil +} + +func (d *streamingDeserializer) readProperty() (*Property, error) { + key, err := d.readString() + if err != nil { + return nil, err + } + value, err := d.readFullyQualified() + if err != nil { + return nil, err + } + if _, err := d.readBytes(2); err != nil { + return nil, err + } + return &Property{Key: key, Value: value}, nil +} + +func (d *streamingDeserializer) readVertexProperty() (*VertexProperty, error) { + id, err := d.readFullyQualified() + if err != nil { + return nil, err + } + labels, err := d.readList(false) + if err != nil { + return nil, err + } + labelSlice, ok := labels.([]interface{}) + if !ok || len(labelSlice) == 0 { + return nil, newError(err0404ReadNullTypeError) + } + label, ok := labelSlice[0].(string) + if !ok { + return nil, newError(err0404ReadNullTypeError) + } + value, err := d.readFullyQualified() + if err != nil { + return nil, err + } + if _, err := d.readBytes(2); err != nil { + return nil, err + } + props, err := d.readFullyQualified() + if err != nil { + return nil, err + } + vp := &VertexProperty{ + Element: Element{Id: id, Label: label}, + Value: value, + } + vp.Properties = make([]interface{}, 0) + if props != nil { + vp.Properties = props + } + return vp, nil +} + +func (d *streamingDeserializer) readBigInt() (*big.Int, error) { + length, err := d.readInt32() + if err != nil { + return nil, err + } + if length == 0 { + return big.NewInt(0), nil + } + b, err := d.readBytes(int(length)) + if err != nil { + return nil, err + } + bi := big.NewInt(0).SetBytes(b) + if b[0]&0x80 != 0 { + one := big.NewInt(1) + bitLen := uint((len(b)*8)/8+1) * 8 + bi.Sub(bi, new(big.Int).Lsh(one, bitLen)) + } + return bi, nil +} + +func (d *streamingDeserializer) readBigDecimal() (*BigDecimal, error) { + scale, err := d.readInt32() + if err != nil { + return nil, err + } + unscaled, err := d.readBigInt() + if err != nil { + return nil, err + } + return &BigDecimal{Scale: scale, UnscaledValue: unscaled}, nil +} + +func (d *streamingDeserializer) readDateTime() (time.Time, error) { + year, err := d.readInt32() + if err != nil { + return time.Time{}, err + } + month, err := d.readByte() + if err != nil { + return time.Time{}, err + } + day, err := d.readByte() + if err != nil { + return time.Time{}, err + } + totalNS, err := d.readInt64() + if err != nil { + return time.Time{}, err + } + offset, err := d.readInt32() + if err != nil { + return time.Time{}, err + } + ns := totalNS % 1e9 + totalS := totalNS / 1e9 + s := totalS % 60 + totalM := totalS / 60 + m := totalM % 60 + h := totalM / 60 + return time.Date(int(year), time.Month(month), int(day), int(h), int(m), int(s), int(ns), GetTimezoneFromOffset(int(offset))), nil +} + +func (d *streamingDeserializer) readDuration() (time.Duration, error) { + seconds, err := d.readInt64() + if err != nil { + return 0, err + } + nanos, err := d.readInt32() + if err != nil { + return 0, err + } + return time.Duration(seconds*int64(time.Second) + int64(nanos)), nil +} + +func (d *streamingDeserializer) readByteBuffer() (*ByteBuffer, error) { + length, err := d.readInt32() + if err != nil { + return nil, err + } + data, err := d.readBytes(int(length)) + if err != nil { + return nil, err + } + return &ByteBuffer{Data: data}, nil +} + +func (d *streamingDeserializer) readEnum() (string, error) { + if _, err := d.readByte(); err != nil { // type code (string) + return "", err + } + if _, err := d.readByte(); err != nil { // null flag + return "", err + } + return d.readString() +} + +func (d *streamingDeserializer) readStatus() (uint32, string, string, error) { + code, err := d.readUint32() + if err != nil { + return 0, "", "", err + } + var message, exception string + flag, err := d.readByte() + if err != nil { + return code, "", "", err + } + if flag != valueFlagNull { + message, err = d.readString() + if err != nil { + return code, "", "", err + } + } + flag, err = d.readByte() + if err != nil { + return code, message, "", err + } + if flag != valueFlagNull { + exception, err = d.readString() + if err != nil { + return code, message, "", err + } + } + return code, message, exception, nil +} diff --git a/gremlin-go/driver/streamingDeserializer_test.go b/gremlin-go/driver/streamingDeserializer_test.go new file mode 100644 index 0000000000..91ad173444 --- /dev/null +++ b/gremlin-go/driver/streamingDeserializer_test.go @@ -0,0 +1,117 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package gremlingo + +import ( + "bytes" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStreamingDeserializer(t *testing.T) { + t.Run("readInt32", func(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x2A} // 42 + d := newStreamingDeserializer(bytes.NewReader(data)) + val, err := d.readInt32() + assert.Nil(t, err) + assert.Equal(t, int32(42), val) + }) + + t.Run("readInt64", func(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} // 100 + d := newStreamingDeserializer(bytes.NewReader(data)) + val, err := d.readInt64() + assert.Nil(t, err) + assert.Equal(t, int64(100), val) + }) + + t.Run("readString", func(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o'} + d := newStreamingDeserializer(bytes.NewReader(data)) + val, err := d.readString() + assert.Nil(t, err) + assert.Equal(t, "hello", val) + }) + + t.Run("readString empty", func(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x00} + d := newStreamingDeserializer(bytes.NewReader(data)) + val, err := d.readString() + assert.Nil(t, err) + assert.Equal(t, "", val) + }) + + t.Run("error on incomplete data", func(t *testing.T) { + data := []byte{0x00, 0x00} // incomplete int32 + d := newStreamingDeserializer(bytes.NewReader(data)) + _, err := d.readInt32() + assert.ErrorIs(t, err, io.ErrUnexpectedEOF) + }) + + t.Run("sticky error", func(t *testing.T) { + data := []byte{0x00} // too short + d := newStreamingDeserializer(bytes.NewReader(data)) + + _, err1 := d.readInt32() + assert.Error(t, err1) + + // Subsequent reads should also fail + _, err2 := d.readInt32() + assert.Error(t, err2) + }) +} + +func TestStreamingChannelDelivery(t *testing.T) { + t.Run("results arrive incrementally via channel", func(t *testing.T) { + rs := newChannelResultSet() + + // Simulate streaming - send results with delays + go func() { + for i := 0; i < 5; i++ { + time.Sleep(10 * time.Millisecond) + rs.Channel() <- &Result{i} + } + rs.Close() + }() + + var times []time.Duration + start := time.Now() + + for { + _, ok, _ := rs.One() + if !ok { + break + } + times = append(times, time.Since(start)) + } + + assert.Equal(t, 5, len(times)) + + // Results should arrive ~10ms apart, not all at once + for i := 1; i < len(times); i++ { + gap := times[i] - times[i-1] + assert.GreaterOrEqual(t, gap, 5*time.Millisecond, + "Results %d and %d arrived too close together: %v", i-1, i, gap) + } + }) +}
