This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch perf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/perf by this push:
     new ca93cf5  trying to reduce mutex
ca93cf5 is described below

commit ca93cf5e76159f5b6681a3c2652d5f8b20ec0f0c
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 25 15:26:59 2024 +0800

    trying to reduce mutex
---
 pkg/tools/btf/linker.go | 49 +++++++++++++++++++++++++++++++++----------------
 pkg/tools/btf/queue.go  |  2 +-
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index bdbd7a6..524a193 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -198,26 +198,43 @@ func (m *Linker) asyncReadEvent(rd queueReader, emap 
*ebpf.Map, dataSupplier fun
                                continue
                        }
 
-                       data := dataSupplier()
-                       if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := reader.NewReader(sample)
-                               r.ReadFrom(sampleReader)
-                               if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                                       continue
-                               }
-                       } else {
-                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
-                                       continue
-                               }
-                       }
-
-                       bufReader(data)
+                       go m.handleReadEvent(sample, dataSupplier, bufReader)
+                       //data := dataSupplier()
+                       //if r, ok := data.(reader.EventReader); ok {
+                       //      sampleReader := reader.NewReader(sample)
+                       //      r.ReadFrom(sampleReader)
+                       //      if readErr := sampleReader.HasError(); readErr 
!= nil {
+                       //              log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                       //              continue
+                       //      }
+                       //} else {
+                       //      if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                       //              log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
+                       //              continue
+                       //      }
+                       //}
+                       //
+                       //bufReader(data)
                }
        }()
 }
 
+func (m *Linker) handleReadEvent(sample []byte, dataSupplier func() 
interface{}, bufReader RingBufferReader) {
+       data := dataSupplier()
+       if r, ok := data.(reader.EventReader); ok {
+               sampleReader := reader.NewReader(sample)
+               r.ReadFrom(sampleReader)
+               if readErr := sampleReader.HasError(); readErr != nil {
+                       log.Warnf("parsing data from ringbuffer error: %v", 
readErr)
+               }
+       } else {
+               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                       log.Warnf("parsing data from ringbuffer error: %v", err)
+               }
+       }
+       bufReader(data)
+}
+
 func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
        executable, err := link.OpenExecutable(path)
        if err != nil {
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc78465..9dd8ed1 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -174,7 +174,7 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                func(receiver *mapReceiver) {
                        linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(data interface{}) {
                                e.routerTransformer(data, receiver.router)
-                       }, receiver.perCPUBuffer, r.parallels, 
receiver.dataSupplier)
+                       }, receiver.perCPUBuffer, 1, receiver.dataSupplier)
                }(r)
        }
 

Reply via email to