This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 44f3a2e  reduce mem alloc
44f3a2e is described below

commit 44f3a2e794d41d90a509cbb7b11837ffeb2d35b6
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 23:57:25 2024 +0800

    reduce mem alloc
---
 pkg/accesslog/events/close.go        |  4 +-
 pkg/accesslog/events/connect.go      |  4 +-
 pkg/accesslog/events/data.go         |  4 +-
 pkg/accesslog/events/detail.go       |  4 +-
 pkg/accesslog/events/events_test.go  | 15 ++++----
 pkg/accesslog/events/ztunnel.go      |  6 ++-
 pkg/tools/btf/linker.go              | 68 +++++++++++++++++++++++++-------
 pkg/tools/btf/queue.go               | 74 -----------------------------------
 pkg/tools/btf/{reader => }/reader.go | 75 +++++++++++++++++++++++++++++-------
 9 files changed, 135 insertions(+), 119 deletions(-)

diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go
index acd0d61..c9cec93 100644
--- a/pkg/accesslog/events/close.go
+++ b/pkg/accesslog/events/close.go
@@ -18,9 +18,9 @@
 package events
 
 import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
@@ -35,7 +35,7 @@ type SocketCloseEvent struct {
        Success   uint32
 }
 
-func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketCloseEvent) ReadFrom(r btf.Reader) {
        c.ConnectionID = r.ReadUint64()
        c.RandomID = r.ReadUint64()
        c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go
index 9433168..ab10338 100644
--- a/pkg/accesslog/events/connect.go
+++ b/pkg/accesslog/events/connect.go
@@ -18,9 +18,9 @@
 package events
 
 import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
@@ -47,7 +47,7 @@ type SocketConnectEvent struct {
        ConnTrackUpstreamPort uint32
 }
 
-func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketConnectEvent) ReadFrom(r btf.Reader) {
        c.ConID = r.ReadUint64()
        c.RandomID = r.ReadUint64()
        c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index d3828c9..7d759e4 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -19,8 +19,8 @@ package events
 
 import (
        "fmt"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
 )
 
@@ -45,7 +45,7 @@ func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
        return &s.Buffer
 }
 
-func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
+func (s *SocketDataUploadEvent) ReadFrom(r btf.Reader) {
        s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
        s.HaveReduce = r.ReadUint8()
        s.Direction0 = enums.SocketDataDirection(r.ReadUint8())
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index a5ac46c..4716f1f 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -18,9 +18,9 @@
 package events
 
 import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "time"
 
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/buffer"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
        "github.com/apache/skywalking-rover/pkg/tools/host"
@@ -73,7 +73,7 @@ type SocketDetailEvent struct {
        SSL                           uint8
 }
 
-func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
+func (d *SocketDetailEvent) ReadFrom(r btf.Reader) {
        d.ConnectionID = r.ReadUint64()
        d.RandomID = r.ReadUint64()
        d.DataID0 = r.ReadUint64()
diff --git a/pkg/accesslog/events/events_test.go 
b/pkg/accesslog/events/events_test.go
index 1da9f3e..70348a0 100644
--- a/pkg/accesslog/events/events_test.go
+++ b/pkg/accesslog/events/events_test.go
@@ -21,20 +21,19 @@ import (
        "bytes"
        "encoding/binary"
        "encoding/hex"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "reflect"
        "strings"
        "testing"
 
        "github.com/stretchr/testify/assert"
-
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
 )
 
 // nolint
 func TestBufferRead(t *testing.T) {
        tests := []struct {
                hex    string
-               create func() reader.EventReader
+               create func() btf.EventReader
        }{
                {
                        hex: `
@@ -46,7 +45,7 @@ func TestBufferRead(t *testing.T) {
 00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01
 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00 00 00 00 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketConnectEvent{}
                        },
                },
@@ -59,7 +58,7 @@ func TestBufferRead(t *testing.T) {
 23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00
 24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00
 03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketDetailEvent{}
                        },
                },
@@ -69,7 +68,7 @@ func TestBufferRead(t *testing.T) {
 b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00
 7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00`,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketCloseEvent{}
                        },
                },
@@ -209,7 +208,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
 00 00 00 00
 `,
-                       create: func() reader.EventReader {
+                       create: func() btf.EventReader {
                                return &SocketDataUploadEvent{}
                        },
                },
@@ -226,7 +225,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
                        }
                        binaryRead := test.create()
                        selfRead := test.create()
-                       bufReader := reader.NewReader(rawData)
+                       bufReader := btf.NewReader(rawData)
                        selfRead.ReadFrom(bufReader)
                        if err := bufReader.HasError(); err != nil {
                                t.Fatalf("reading by self parsing error: %v", 
err)
diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go
index abd2f1c..81572c3 100644
--- a/pkg/accesslog/events/ztunnel.go
+++ b/pkg/accesslog/events/ztunnel.go
@@ -17,7 +17,9 @@
 
 package events
 
-import "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+import (
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
+)
 
 type ZTunnelSocketMappingEvent struct {
        OriginalSrcIP        uint32
@@ -30,7 +32,7 @@ type ZTunnelSocketMappingEvent struct {
        Pad1                 uint32
 }
 
-func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) {
+func (z *ZTunnelSocketMappingEvent) ReadFrom(r btf.Reader) {
        z.OriginalSrcIP = r.ReadUint32()
        z.OriginalDestIP = r.ReadUint32()
        z.OriginalSrcPort = r.ReadUint16()
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..690a89f 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -22,15 +22,13 @@ import (
        "encoding/binary"
        "errors"
        "fmt"
+       "golang.org/x/arch/arm64/arm64asm"
+       "golang.org/x/arch/x86/x86asm"
        "io"
        "os"
        "runtime"
        "sync"
 
-       "golang.org/x/arch/arm64/arm64asm"
-       "golang.org/x/arch/x86/x86asm"
-
-       "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
        "github.com/apache/skywalking-rover/pkg/tools/elf"
        "github.com/apache/skywalking-rover/pkg/tools/process"
 
@@ -167,7 +165,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader 
RingBufferReader, data
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
        parallels int, dataSupplier func() interface{}) {
-       rd, err := newQueueReader(emap, perCPUBuffer)
+       rd, err := perf.NewReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
                return
@@ -178,42 +176,53 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        }
        m.closers = append(m.closers, rd)
 
+       recordBuilder := newPerfRecordBuilder(dataSupplier())
        for i := 0; i < parallels; i++ {
-               m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+               m.asyncReadEvent(rd, emap, recordBuilder, dataSupplier, 
bufReader)
        }
 }
 
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, recordPool 
*perfRecordBuilder,
+       dataSupplier func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
-                       sample, err := rd.Read()
+                       record := recordPool.GetRecord()
+                       err := rd.ReadInto(record)
                        if err != nil {
+                               recordPool.PutRecord(record)
                                if errors.Is(err, perf.ErrClosed) {
                                        return
                                }
                                log.Warnf("read from %s ringbuffer error: %v", 
emap.String(), err)
                                continue
                        }
-                       if len(sample) == 0 {
+
+                       if record.LostSamples != 0 {
+                               log.Warnf("perf event queue(%s) full, dropped 
%d samples", emap.String(), record.LostSamples)
+                               recordPool.PutRecord(record)
                                continue
                        }
 
                        data := dataSupplier()
-                       if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := reader.NewReader(sample)
+                       if r, ok := data.(EventReader); ok {
+                               sampleReader := NewReader(record.RawSample)
                                r.ReadFrom(sampleReader)
                                if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       recordPool.PutRecord(record)
                                        continue
                                }
                        } else {
-                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                               if err := 
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err 
!= nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       recordPool.PutRecord(record)
                                        continue
                                }
                        }
 
                        bufReader(data)
+
+                       recordPool.PutRecord(record)
                }
        }()
 }
@@ -402,3 +411,34 @@ func (m *Linker) Close() error {
        })
        return err
 }
+
+type perfRecordBuilder struct {
+       dataSize int
+       pool     sync.Pool
+}
+
+func newPerfRecordBuilder(data interface{}) *perfRecordBuilder {
+       // added 8 bytes means fix some event not aligned
+       var size = binary.Size(data) + 8
+       if r, ok := data.(EventReader); ok {
+               reader := newSizeCalcReader()
+               r.ReadFrom(reader)
+               size = reader.Size() + 8
+       }
+
+       builder := &perfRecordBuilder{
+               dataSize: size,
+       }
+       builder.pool.New = func() any {
+               return &perf.Record{RawSample: make([]byte, 0, 
builder.dataSize)}
+       }
+       return builder
+}
+
+func (p *perfRecordBuilder) GetRecord() *perf.Record {
+       return p.pool.Get().(*perf.Record)
+}
+
+func (p *perfRecordBuilder) PutRecord(r *perf.Record) {
+       p.pool.Put(r)
+}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..eff29c8 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,90 +19,16 @@ package btf
 
 import (
        "context"
-       "fmt"
        "sync"
        "time"
 
        "github.com/cilium/ebpf"
-       "github.com/cilium/ebpf/perf"
-       "github.com/cilium/ebpf/ringbuf"
 )
 
 // queueChannelReducingCountCheckInterval is the interval to check the queue 
channel reducing count
 // if the reducing count is almost full, then added a warning log
 const queueChannelReducingCountCheckInterval = time.Second * 5
 
-type queueReader interface {
-       Read() ([]byte, error)
-       Close() error
-}
-
-func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
-       switch emap.Type() {
-       case ebpf.RingBuf:
-               return newRingBufReader(emap)
-       case ebpf.PerfEventArray:
-               return newPerfQueueReader(emap, perCPUBuffer)
-       }
-       return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
-}
-
-type perfQueueReader struct {
-       name   string
-       reader *perf.Reader
-}
-
-func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, 
error) {
-       reader, err := perf.NewReader(emap, perCPUBuffer)
-       if err != nil {
-               return nil, err
-       }
-       return &perfQueueReader{reader: reader, name: emap.String()}, nil
-}
-
-func (p *perfQueueReader) Read() ([]byte, error) {
-       read, err := p.reader.Read()
-       if err != nil {
-               return nil, err
-       }
-
-       if read.LostSamples != 0 {
-               log.Warnf("perf event queue(%s) full, dropped %d samples", 
p.name, read.LostSamples)
-               return nil, nil
-       }
-
-       return read.RawSample, nil
-}
-
-func (p *perfQueueReader) Close() error {
-       return p.reader.Close()
-}
-
-type ringBufReader struct {
-       reader *ringbuf.Reader
-       name   string
-}
-
-func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
-       reader, err := ringbuf.NewReader(emap)
-       if err != nil {
-               return nil, err
-       }
-       return &ringBufReader{reader: reader, name: emap.String()}, nil
-}
-
-func (r *ringBufReader) Read() ([]byte, error) {
-       read, err := r.reader.Read()
-       if err != nil {
-               return nil, err
-       }
-       return read.RawSample, nil
-}
-
-func (r *ringBufReader) Close() error {
-       return r.reader.Close()
-}
-
 type PartitionContext interface {
        Start(ctx context.Context)
        Consume(data interface{})
diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader.go
similarity index 62%
rename from pkg/tools/btf/reader/reader.go
rename to pkg/tools/btf/reader.go
index d2e980e..94336a4 100644
--- a/pkg/tools/btf/reader/reader.go
+++ b/pkg/tools/btf/reader.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package reader
+package btf
 
 import (
        "encoding/binary"
@@ -25,11 +25,20 @@ import (
 // EventReader read the sample data by self, instant of binary.Read
 type EventReader interface {
        // ReadFrom read buffer data
-       ReadFrom(reader *Reader)
+       ReadFrom(reader Reader)
 }
 
-// Reader buffer sample reader
-type Reader struct {
+type Reader interface {
+       HasError() error
+       ReadUint64() uint64
+       ReadUint32() uint32
+       ReadUint16() uint16
+       ReadUint8() uint8
+       ReadUint8Array(a []uint8, size int)
+}
+
+// BytesReader buffer sample reader
+type BytesReader struct {
        Sample        []byte
        CurrentOffset int
        sampleLen     int
@@ -37,8 +46,8 @@ type Reader struct {
 }
 
 // NewReader create a reader from BPF buffer
-func NewReader(sample []byte) *Reader {
-       return &Reader{
+func NewReader(sample []byte) Reader {
+       return &BytesReader{
                Sample:        sample,
                CurrentOffset: 0,
                sampleLen:     len(sample),
@@ -46,11 +55,11 @@ func NewReader(sample []byte) *Reader {
 }
 
 // HasError is there have error when reading buffer
-func (r *Reader) HasError() error {
+func (r *BytesReader) HasError() error {
        return r.err
 }
 
-func (r *Reader) ReadUint64() uint64 {
+func (r *BytesReader) ReadUint64() uint64 {
        bytes, err := r.read(8)
        if err != nil {
                return 0
@@ -58,7 +67,7 @@ func (r *Reader) ReadUint64() uint64 {
        return binary.LittleEndian.Uint64(bytes)
 }
 
-func (r *Reader) ReadUint32() uint32 {
+func (r *BytesReader) ReadUint32() uint32 {
        bytes, err := r.read(4)
        if err != nil {
                return 0
@@ -66,7 +75,7 @@ func (r *Reader) ReadUint32() uint32 {
        return binary.LittleEndian.Uint32(bytes)
 }
 
-func (r *Reader) ReadUint16() uint16 {
+func (r *BytesReader) ReadUint16() uint16 {
        bytes, err := r.read(2)
        if err != nil {
                return 0
@@ -74,7 +83,7 @@ func (r *Reader) ReadUint16() uint16 {
        return binary.LittleEndian.Uint16(bytes)
 }
 
-func (r *Reader) ReadUint8() uint8 {
+func (r *BytesReader) ReadUint8() uint8 {
        bytes, err := r.read(1)
        if err != nil {
                return 0
@@ -82,7 +91,7 @@ func (r *Reader) ReadUint8() uint8 {
        return bytes[0]
 }
 
-func (r *Reader) ReadUint8Array(a []uint8, size int) {
+func (r *BytesReader) ReadUint8Array(a []uint8, size int) {
        read, err := r.read(size)
        if err != nil {
                return
@@ -90,7 +99,7 @@ func (r *Reader) ReadUint8Array(a []uint8, size int) {
        copy(a, read)
 }
 
-func (r *Reader) read(size int) ([]byte, error) {
+func (r *BytesReader) read(size int) ([]byte, error) {
        if r.err != nil {
                return nil, r.err
        }
@@ -103,3 +112,43 @@ func (r *Reader) read(size int) ([]byte, error) {
        r.CurrentOffset += size
        return bytes, nil
 }
+
+type sizeCalcReader struct {
+       size int
+}
+
+func newSizeCalcReader() *sizeCalcReader {
+       return &sizeCalcReader{}
+}
+
+func (r *sizeCalcReader) HasError() error {
+       return nil
+}
+
+func (r *sizeCalcReader) ReadUint64() uint64 {
+       r.size += 8
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint32() uint32 {
+       r.size += 4
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint16() uint16 {
+       r.size += 2
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint8() uint8 {
+       r.size += 1
+       return 0
+}
+
+func (r *sizeCalcReader) ReadUint8Array(a []uint8, size int) {
+       r.size += size
+}
+
+func (r *sizeCalcReader) Size() int {
+       return r.size
+}

Reply via email to