This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch perf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/perf by this push:
new ca93cf5 trying to reduce mutex
ca93cf5 is described below
commit ca93cf5e76159f5b6681a3c2652d5f8b20ec0f0c
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 15:26:59 2024 +0800
trying to reduce mutex
---
pkg/tools/btf/linker.go | 49 +++++++++++++++++++++++++++++++++----------------
pkg/tools/btf/queue.go | 2 +-
2 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..524a193 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -198,26 +198,43 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap
*ebpf.Map, dataSupplier fun
continue
}
- data := dataSupplier()
- if r, ok := data.(reader.EventReader); ok {
- sampleReader := reader.NewReader(sample)
- r.ReadFrom(sampleReader)
- if readErr := sampleReader.HasError(); readErr
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- continue
- }
- } else {
- if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
- continue
- }
- }
-
- bufReader(data)
+ go m.handleReadEvent(sample, dataSupplier, bufReader)
+ //data := dataSupplier()
+ //if r, ok := data.(reader.EventReader); ok {
+ // sampleReader := reader.NewReader(sample)
+ // r.ReadFrom(sampleReader)
+ // if readErr := sampleReader.HasError(); readErr
!= nil {
+ // log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ // continue
+ // }
+ //} else {
+ // if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
+ // log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+ // continue
+ // }
+ //}
+ //
+ //bufReader(data)
}
}()
}
+func (m *Linker) handleReadEvent(sample []byte, dataSupplier func()
interface{}, bufReader RingBufferReader) {
+ data := dataSupplier()
+ if r, ok := data.(reader.EventReader); ok {
+ sampleReader := reader.NewReader(sample)
+ r.ReadFrom(sampleReader)
+ if readErr := sampleReader.HasError(); readErr != nil {
+ log.Warnf("parsing data from ringbuffer error: %v",
readErr)
+ }
+ } else {
+ if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
+ log.Warnf("parsing data from ringbuffer error: %v", err)
+ }
+ }
+ bufReader(data)
+}
+
func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
executable, err := link.OpenExecutable(path)
if err != nil {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc78465..9dd8ed1 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -174,7 +174,7 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
func(receiver *mapReceiver) {
linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
e.routerTransformer(data, receiver.router)
- }, receiver.perCPUBuffer, r.parallels,
receiver.dataSupplier)
+ }, receiver.perCPUBuffer, 1, receiver.dataSupplier)
}(r)
}