This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 44f3a2e reduce mem alloc
44f3a2e is described below
commit 44f3a2e794d41d90a509cbb7b11837ffeb2d35b6
Author: mrproliu <[email protected]>
AuthorDate: Sun Dec 29 23:57:25 2024 +0800
reduce mem alloc
---
pkg/accesslog/events/close.go | 4 +-
pkg/accesslog/events/connect.go | 4 +-
pkg/accesslog/events/data.go | 4 +-
pkg/accesslog/events/detail.go | 4 +-
pkg/accesslog/events/events_test.go | 15 ++++----
pkg/accesslog/events/ztunnel.go | 6 ++-
pkg/tools/btf/linker.go | 68 +++++++++++++++++++++++++-------
pkg/tools/btf/queue.go | 74 -----------------------------------
pkg/tools/btf/{reader => }/reader.go | 75 +++++++++++++++++++++++++++++-------
9 files changed, 135 insertions(+), 119 deletions(-)
diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go
index acd0d61..c9cec93 100644
--- a/pkg/accesslog/events/close.go
+++ b/pkg/accesslog/events/close.go
@@ -18,9 +18,9 @@
package events
import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
@@ -35,7 +35,7 @@ type SocketCloseEvent struct {
Success uint32
}
-func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketCloseEvent) ReadFrom(r btf.Reader) {
c.ConnectionID = r.ReadUint64()
c.RandomID = r.ReadUint64()
c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go
index 9433168..ab10338 100644
--- a/pkg/accesslog/events/connect.go
+++ b/pkg/accesslog/events/connect.go
@@ -18,9 +18,9 @@
package events
import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
@@ -47,7 +47,7 @@ type SocketConnectEvent struct {
ConnTrackUpstreamPort uint32
}
-func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) {
+func (c *SocketConnectEvent) ReadFrom(r btf.Reader) {
c.ConID = r.ReadUint64()
c.RandomID = r.ReadUint64()
c.StartTime = r.ReadUint64()
diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go
index d3828c9..7d759e4 100644
--- a/pkg/accesslog/events/data.go
+++ b/pkg/accesslog/events/data.go
@@ -19,8 +19,8 @@ package events
import (
"fmt"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/enums"
)
@@ -45,7 +45,7 @@ func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte {
return &s.Buffer
}
-func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) {
+func (s *SocketDataUploadEvent) ReadFrom(r btf.Reader) {
s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8())
s.HaveReduce = r.ReadUint8()
s.Direction0 = enums.SocketDataDirection(r.ReadUint8())
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index a5ac46c..4716f1f 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -18,9 +18,9 @@
package events
import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"time"
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -73,7 +73,7 @@ type SocketDetailEvent struct {
SSL uint8
}
-func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
+func (d *SocketDetailEvent) ReadFrom(r btf.Reader) {
d.ConnectionID = r.ReadUint64()
d.RandomID = r.ReadUint64()
d.DataID0 = r.ReadUint64()
diff --git a/pkg/accesslog/events/events_test.go
b/pkg/accesslog/events/events_test.go
index 1da9f3e..70348a0 100644
--- a/pkg/accesslog/events/events_test.go
+++ b/pkg/accesslog/events/events_test.go
@@ -21,20 +21,19 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"reflect"
"strings"
"testing"
"github.com/stretchr/testify/assert"
-
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
)
// nolint
func TestBufferRead(t *testing.T) {
tests := []struct {
hex string
- create func() reader.EventReader
+ create func() btf.EventReader
}{
{
hex: `
@@ -46,7 +45,7 @@ func TestBufferRead(t *testing.T) {
00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketConnectEvent{}
},
},
@@ -59,7 +58,7 @@ func TestBufferRead(t *testing.T) {
23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00
24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00
03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketDetailEvent{}
},
},
@@ -69,7 +68,7 @@ func TestBufferRead(t *testing.T) {
b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00
7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketCloseEvent{}
},
},
@@ -209,7 +208,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00
`,
- create: func() reader.EventReader {
+ create: func() btf.EventReader {
return &SocketDataUploadEvent{}
},
},
@@ -226,7 +225,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be
}
binaryRead := test.create()
selfRead := test.create()
- bufReader := reader.NewReader(rawData)
+ bufReader := btf.NewReader(rawData)
selfRead.ReadFrom(bufReader)
if err := bufReader.HasError(); err != nil {
t.Fatalf("reading by self parsing error: %v",
err)
diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go
index abd2f1c..81572c3 100644
--- a/pkg/accesslog/events/ztunnel.go
+++ b/pkg/accesslog/events/ztunnel.go
@@ -17,7 +17,9 @@
package events
-import "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
+import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
+)
type ZTunnelSocketMappingEvent struct {
OriginalSrcIP uint32
@@ -30,7 +32,7 @@ type ZTunnelSocketMappingEvent struct {
Pad1 uint32
}
-func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) {
+func (z *ZTunnelSocketMappingEvent) ReadFrom(r btf.Reader) {
z.OriginalSrcIP = r.ReadUint32()
z.OriginalDestIP = r.ReadUint32()
z.OriginalSrcPort = r.ReadUint16()
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..690a89f 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -22,15 +22,13 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "golang.org/x/arch/arm64/arm64asm"
+ "golang.org/x/arch/x86/x86asm"
"io"
"os"
"runtime"
"sync"
- "golang.org/x/arch/arm64/arm64asm"
- "golang.org/x/arch/x86/x86asm"
-
- "github.com/apache/skywalking-rover/pkg/tools/btf/reader"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/process"
@@ -167,7 +165,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader
RingBufferReader, data
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
parallels int, dataSupplier func() interface{}) {
- rd, err := newQueueReader(emap, perCPUBuffer)
+ rd, err := perf.NewReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
return
@@ -178,42 +176,53 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap
*ebpf.Map, bufReader RingBuff
}
m.closers = append(m.closers, rd)
+ recordBuilder := newPerfRecordBuilder(dataSupplier())
for i := 0; i < parallels; i++ {
- m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+ m.asyncReadEvent(rd, emap, recordBuilder, dataSupplier,
bufReader)
}
}
-func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, recordPool
*perfRecordBuilder,
+ dataSupplier func() interface{}, bufReader RingBufferReader) {
go func() {
for {
- sample, err := rd.Read()
+ record := recordPool.GetRecord()
+ err := rd.ReadInto(record)
if err != nil {
+ recordPool.PutRecord(record)
if errors.Is(err, perf.ErrClosed) {
return
}
log.Warnf("read from %s ringbuffer error: %v",
emap.String(), err)
continue
}
- if len(sample) == 0 {
+
+ if record.LostSamples != 0 {
+ log.Warnf("perf event queue(%s) full, dropped
%d samples", emap.String(), record.LostSamples)
+ recordPool.PutRecord(record)
continue
}
data := dataSupplier()
- if r, ok := data.(reader.EventReader); ok {
- sampleReader := reader.NewReader(sample)
+ if r, ok := data.(EventReader); ok {
+ sampleReader := NewReader(record.RawSample)
r.ReadFrom(sampleReader)
if readErr := sampleReader.HasError(); readErr
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ recordPool.PutRecord(record)
continue
}
} else {
- if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ if err :=
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err
!= nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ recordPool.PutRecord(record)
continue
}
}
bufReader(data)
+
+ recordPool.PutRecord(record)
}
}()
}
@@ -402,3 +411,34 @@ func (m *Linker) Close() error {
})
return err
}
+
+type perfRecordBuilder struct {
+ dataSize int
+ pool sync.Pool
+}
+
+func newPerfRecordBuilder(data interface{}) *perfRecordBuilder {
+ // added 8 bytes means fix some event not aligned
+ var size = binary.Size(data) + 8
+ if r, ok := data.(EventReader); ok {
+ reader := newSizeCalcReader()
+ r.ReadFrom(reader)
+ size = reader.Size() + 8
+ }
+
+ builder := &perfRecordBuilder{
+ dataSize: size,
+ }
+ builder.pool.New = func() any {
+ return &perf.Record{RawSample: make([]byte, 0,
builder.dataSize)}
+ }
+ return builder
+}
+
+func (p *perfRecordBuilder) GetRecord() *perf.Record {
+ return p.pool.Get().(*perf.Record)
+}
+
+func (p *perfRecordBuilder) PutRecord(r *perf.Record) {
+ p.pool.Put(r)
+}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index fdd8754..eff29c8 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,90 +19,16 @@ package btf
import (
"context"
- "fmt"
"sync"
"time"
"github.com/cilium/ebpf"
- "github.com/cilium/ebpf/perf"
- "github.com/cilium/ebpf/ringbuf"
)
// queueChannelReducingCountCheckInterval is the interval to check the queue
channel reducing count
// if the reducing count is almost full, then added a warning log
const queueChannelReducingCountCheckInterval = time.Second * 5
-type queueReader interface {
- Read() ([]byte, error)
- Close() error
-}
-
-func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
- switch emap.Type() {
- case ebpf.RingBuf:
- return newRingBufReader(emap)
- case ebpf.PerfEventArray:
- return newPerfQueueReader(emap, perCPUBuffer)
- }
- return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
-}
-
-type perfQueueReader struct {
- name string
- reader *perf.Reader
-}
-
-func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader,
error) {
- reader, err := perf.NewReader(emap, perCPUBuffer)
- if err != nil {
- return nil, err
- }
- return &perfQueueReader{reader: reader, name: emap.String()}, nil
-}
-
-func (p *perfQueueReader) Read() ([]byte, error) {
- read, err := p.reader.Read()
- if err != nil {
- return nil, err
- }
-
- if read.LostSamples != 0 {
- log.Warnf("perf event queue(%s) full, dropped %d samples",
p.name, read.LostSamples)
- return nil, nil
- }
-
- return read.RawSample, nil
-}
-
-func (p *perfQueueReader) Close() error {
- return p.reader.Close()
-}
-
-type ringBufReader struct {
- reader *ringbuf.Reader
- name string
-}
-
-func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
- reader, err := ringbuf.NewReader(emap)
- if err != nil {
- return nil, err
- }
- return &ringBufReader{reader: reader, name: emap.String()}, nil
-}
-
-func (r *ringBufReader) Read() ([]byte, error) {
- read, err := r.reader.Read()
- if err != nil {
- return nil, err
- }
- return read.RawSample, nil
-}
-
-func (r *ringBufReader) Close() error {
- return r.reader.Close()
-}
-
type PartitionContext interface {
Start(ctx context.Context)
Consume(data interface{})
diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader.go
similarity index 62%
rename from pkg/tools/btf/reader/reader.go
rename to pkg/tools/btf/reader.go
index d2e980e..94336a4 100644
--- a/pkg/tools/btf/reader/reader.go
+++ b/pkg/tools/btf/reader.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package reader
+package btf
import (
"encoding/binary"
@@ -25,11 +25,20 @@ import (
// EventReader read the sample data by self, instant of binary.Read
type EventReader interface {
// ReadFrom read buffer data
- ReadFrom(reader *Reader)
+ ReadFrom(reader Reader)
}
-// Reader buffer sample reader
-type Reader struct {
+type Reader interface {
+ HasError() error
+ ReadUint64() uint64
+ ReadUint32() uint32
+ ReadUint16() uint16
+ ReadUint8() uint8
+ ReadUint8Array(a []uint8, size int)
+}
+
+// BytesReader buffer sample reader
+type BytesReader struct {
Sample []byte
CurrentOffset int
sampleLen int
@@ -37,8 +46,8 @@ type Reader struct {
}
// NewReader create a reader from BPF buffer
-func NewReader(sample []byte) *Reader {
- return &Reader{
+func NewReader(sample []byte) Reader {
+ return &BytesReader{
Sample: sample,
CurrentOffset: 0,
sampleLen: len(sample),
@@ -46,11 +55,11 @@ func NewReader(sample []byte) *Reader {
}
// HasError is there have error when reading buffer
-func (r *Reader) HasError() error {
+func (r *BytesReader) HasError() error {
return r.err
}
-func (r *Reader) ReadUint64() uint64 {
+func (r *BytesReader) ReadUint64() uint64 {
bytes, err := r.read(8)
if err != nil {
return 0
@@ -58,7 +67,7 @@ func (r *Reader) ReadUint64() uint64 {
return binary.LittleEndian.Uint64(bytes)
}
-func (r *Reader) ReadUint32() uint32 {
+func (r *BytesReader) ReadUint32() uint32 {
bytes, err := r.read(4)
if err != nil {
return 0
@@ -66,7 +75,7 @@ func (r *Reader) ReadUint32() uint32 {
return binary.LittleEndian.Uint32(bytes)
}
-func (r *Reader) ReadUint16() uint16 {
+func (r *BytesReader) ReadUint16() uint16 {
bytes, err := r.read(2)
if err != nil {
return 0
@@ -74,7 +83,7 @@ func (r *Reader) ReadUint16() uint16 {
return binary.LittleEndian.Uint16(bytes)
}
-func (r *Reader) ReadUint8() uint8 {
+func (r *BytesReader) ReadUint8() uint8 {
bytes, err := r.read(1)
if err != nil {
return 0
@@ -82,7 +91,7 @@ func (r *Reader) ReadUint8() uint8 {
return bytes[0]
}
-func (r *Reader) ReadUint8Array(a []uint8, size int) {
+func (r *BytesReader) ReadUint8Array(a []uint8, size int) {
read, err := r.read(size)
if err != nil {
return
@@ -90,7 +99,7 @@ func (r *Reader) ReadUint8Array(a []uint8, size int) {
copy(a, read)
}
-func (r *Reader) read(size int) ([]byte, error) {
+func (r *BytesReader) read(size int) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
@@ -103,3 +112,43 @@ func (r *Reader) read(size int) ([]byte, error) {
r.CurrentOffset += size
return bytes, nil
}
+
+type sizeCalcReader struct {
+ size int
+}
+
+func newSizeCalcReader() *sizeCalcReader {
+ return &sizeCalcReader{}
+}
+
+func (r *sizeCalcReader) HasError() error {
+ return nil
+}
+
+func (r *sizeCalcReader) ReadUint64() uint64 {
+ r.size += 8
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint32() uint32 {
+ r.size += 4
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint16() uint16 {
+ r.size += 2
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint8() uint8 {
+ r.size += 1
+ return 0
+}
+
+func (r *sizeCalcReader) ReadUint8Array(a []uint8, size int) {
+ r.size += size
+}
+
+func (r *sizeCalcReader) Size() int {
+ return r.size
+}