This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 271e2cd  Update to using frequency mode to `ON_CPU` Profiling. (#37)
271e2cd is described below

commit 271e2cd5ae3abd96d127499ce19a55d59111d7be
Author: mrproliu <[email protected]>
AuthorDate: Wed May 25 16:21:20 2022 +0800

    Update to using frequency mode to `ON_CPU` Profiling. (#37)
---
 CHANGES.md                          |   1 +
 bpf/profiling/oncpu.c               |  21 +++++-
 bpf/profiling/oncpu.h               |   5 +-
 pkg/profiling/task/offcpu/runner.go |   3 +
 pkg/profiling/task/oncpu/runner.go  | 137 ++++++++++++++++--------------------
 5 files changed, 88 insertions(+), 79 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index aaa79e3..858b912 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
 #### Features
 * Support `OFF_CPU` Profiling.
 * Introduce the `BTFHub` module.
+* Update to using frequency mode to `ON_CPU` Profiling.
 
 #### Bug Fixes
 
diff --git a/bpf/profiling/oncpu.c b/bpf/profiling/oncpu.c
index fa4321a..0c25893 100644
--- a/bpf/profiling/oncpu.c
+++ b/bpf/profiling/oncpu.c
@@ -22,6 +22,16 @@ char __license[] SEC("license") = "Dual MIT/GPL";
 
 SEC("perf_event")
 int do_perf_event(struct pt_regs *ctx) {
+    int monitor_pid;
+    asm("%0 = MONITOR_PID ll" : "=r"(monitor_pid));
+
+    // only match the same pid
+    __u64 id = bpf_get_current_pid_tgid();
+    __u32 tgid = id >> 32;
+    if (tgid != monitor_pid) {
+        return 0;
+    }
+
     // create map key
     struct key_t key = {};
 
@@ -29,6 +39,15 @@ int do_perf_event(struct pt_regs *ctx) {
     key.kernel_stack_id = bpf_get_stackid(ctx, &stacks, 0);
     key.user_stack_id = bpf_get_stackid(ctx, &stacks, BPF_F_USER_STACK);
 
-    bpf_perf_event_output(ctx, &counts, BPF_F_CURRENT_CPU, &key, sizeof(key));
+    __u32 *val;
+    val = bpf_map_lookup_elem(&counts, &key);
+    if (!val) {
+        __u32 count = 0;
+        bpf_map_update_elem(&counts, &key, &count, BPF_NOEXIST);
+        val = bpf_map_lookup_elem(&counts, &key);
+        if (!val)
+            return 0;
+    }
+    (*val) += 1;
     return 0;
 }
\ No newline at end of file
diff --git a/bpf/profiling/oncpu.h b/bpf/profiling/oncpu.h
index 6fc467d..51c2940 100644
--- a/bpf/profiling/oncpu.h
+++ b/bpf/profiling/oncpu.h
@@ -21,7 +21,10 @@ struct key_t {
 };
 
 struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+       __uint(type, BPF_MAP_TYPE_HASH);
+       __type(key, struct key_t);
+       __type(value, __u32);
+       __uint(max_entries, 10000);
 } counts SEC(".maps");
 
 struct {
diff --git a/pkg/profiling/task/offcpu/runner.go 
b/pkg/profiling/task/offcpu/runner.go
index dca5343..d45a899 100644
--- a/pkg/profiling/task/offcpu/runner.go
+++ b/pkg/profiling/task/offcpu/runner.go
@@ -185,6 +185,9 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, 
error) {
                        duration -= int64(existCounter.Deltas)
                }
                r.previousStacks[stack] = counter
+               if switchCount <= 0 {
+                       continue
+               }
 
                result = append(result, &v3.EBPFProfilingData{
                        Profiling: &v3.EBPFProfilingData_OffCPU{
diff --git a/pkg/profiling/task/oncpu/runner.go 
b/pkg/profiling/task/oncpu/runner.go
index 7c9cc33..8051ce6 100644
--- a/pkg/profiling/task/oncpu/runner.go
+++ b/pkg/profiling/task/oncpu/runner.go
@@ -20,18 +20,11 @@
 package oncpu
 
 import (
-       "bytes"
        "context"
-       "encoding/binary"
-       "errors"
        "fmt"
-       "os"
        "runtime"
        "time"
 
-       "github.com/cilium/ebpf"
-       "github.com/cilium/ebpf/perf"
-
        "github.com/hashicorp/go-multierror"
 
        "github.com/apache/skywalking-rover/pkg/logger"
@@ -61,14 +54,14 @@ type Runner struct {
        pid              int32
        processProfiling *profiling.Info
        kernelProfiling  *profiling.Info
-       dumpPeriod       time.Duration
+       dumpFrequency    int64
 
        // runtime
        perfEventFds    []int
-       countReader     *perf.Reader
-       stackCounter    map[Event]int
-       stackMap        *ebpf.Map
+       bpf             *bpfObjects
+       stackCounter    map[Event]uint32
        flushDataNotify context.CancelFunc
+       stopChan        chan bool
 }
 
 func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) {
@@ -83,9 +76,8 @@ func NewRunner(config *base.TaskConfig) 
(base.ProfileTaskRunner, error) {
                return nil, fmt.Errorf("the ON_CPU dump period could not be 
smaller than 1ms")
        }
        return &Runner{
-               base:         base.NewBaseRunner(),
-               dumpPeriod:   dumpPeriod,
-               stackCounter: make(map[Event]int),
+               base:          base.NewBaseRunner(),
+               dumpFrequency: time.Second.Milliseconds() / 
dumpPeriod.Milliseconds(),
        }, nil
 }
 
@@ -101,25 +93,36 @@ func (r *Runner) Init(task *base.ProfilingTask, process 
api.ProcessInterface) er
                log.Warnf("could not analyze kernel profiling stats: %v", err)
        }
        r.kernelProfiling = kernelProfiling
-       r.stackCounter = make(map[Event]int)
+       r.stackCounter = make(map[Event]uint32)
+       r.stopChan = make(chan bool, 1)
        return nil
 }
 
 func (r *Runner) Run(ctx context.Context, notify 
base.ProfilingRunningSuccessNotify) error {
        // load bpf
        objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, nil); err != nil {
+       spec, err := loadBpf()
+       if err != nil {
                return err
        }
-       defer objs.Close()
-       r.stackMap = objs.Stacks
-
-       // init profiling data reader
-       rd, err := perf.NewReader(objs.Counts, os.Getpagesize())
-       if err != nil {
-               return fmt.Errorf("creating perf event reader: %s", err)
+       // update the monitor pid
+       funcName := "do_perf_event"
+       replacedPid := false
+       for i, ins := range spec.Programs[funcName].Instructions {
+               if ins.Reference == "MONITOR_PID" {
+                       spec.Programs[funcName].Instructions[i].Constant = 
int64(r.pid)
+                       spec.Programs[funcName].Instructions[i].Offset = 0
+                       replacedPid = true
+               }
+       }
+       if !replacedPid {
+               return fmt.Errorf("replace the monitor pid failure")
        }
-       r.countReader = rd
+       if err1 := spec.LoadAndAssign(&objs, nil); err1 != nil {
+               log.Fatalf("loading objects: %s", err1)
+       }
+       defer objs.Close()
+       r.bpf = &objs
 
        // opened perf events
        perfEvents, err := r.openPerfEvent(objs.DoPerfEvent.FD())
@@ -131,48 +134,24 @@ func (r *Runner) Run(ctx context.Context, notify 
base.ProfilingRunningSuccessNot
        // notify start success
        notify()
        runtime.SetFinalizer(r, (*Runner).Stop)
-
-       // read content
-       var event Event
-       for {
-               record, err := rd.Read()
-               if err != nil {
-                       if errors.Is(err, perf.ErrClosed) {
-                               return nil
-                       }
-                       log.Warnf("reading from perf event reader: %s", err)
-                       continue
-               }
-
-               if record.LostSamples != 0 {
-                       log.Warnf("perf event ring buffer full, dropped %d 
samples", record.LostSamples)
-                       continue
-               }
-
-               // parse perf event data
-               if err := binary.Read(bytes.NewBuffer(record.RawSample), 
binary.LittleEndian, &event); err != nil {
-                       log.Errorf("parsing perf event error: %s", err)
-                       continue
-               }
-
-               r.stackCounter[event]++
-       }
+       <-r.stopChan
+       return nil
 }
 
 func (r *Runner) openPerfEvent(perfFd int) ([]int, error) {
        eventAttr := &unix.PerfEventAttr{
-               Type:        unix.PERF_TYPE_SOFTWARE,
-               Config:      unix.PERF_COUNT_SW_CPU_CLOCK,
-               Sample_type: unix.PERF_SAMPLE_RAW,
-               Sample:      uint64(r.dumpPeriod.Nanoseconds()),
-               Wakeup:      1,
+               Type:   unix.PERF_TYPE_SOFTWARE,
+               Config: unix.PERF_COUNT_SW_CPU_CLOCK,
+               Bits:   unix.PerfBitFreq,
+               Sample: uint64(r.dumpFrequency),
+               Wakeup: 1,
        }
 
        fds := make([]int, 0)
        for cpuNum := 0; cpuNum < runtime.NumCPU(); cpuNum++ {
                fd, err := unix.PerfEventOpen(
                        eventAttr,
-                       int(r.pid),
+                       -1,
                        cpuNum,
                        -1,
                        0,
@@ -213,8 +192,8 @@ func (r *Runner) Stop() error {
                case <-time.After(5 * time.Second):
                }
 
-               if r.countReader != nil {
-                       if err := r.countReader.Close(); err != nil {
+               if r.bpf != nil {
+                       if err := r.bpf.Close(); err != nil {
                                result = multierror.Append(result, err)
                        }
                }
@@ -223,30 +202,38 @@ func (r *Runner) Stop() error {
 }
 
 func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
-       existsCounters := r.flushStackCounter()
-
+       var stack Event
+       var counter uint32
+       iterate := r.bpf.Counts.Iterate()
+       stacks := r.bpf.Stacks
        result := make([]*v3.EBPFProfilingData, 0)
        stackSymbols := make([]uint64, 100)
-       for event, count := range existsCounters {
+       for iterate.Next(&stack, &counter) {
                metadatas := make([]*v3.EBPFProfilingStackMetadata, 0)
                // kernel stack
-               if d := r.base.GenerateProfilingData(r.kernelProfiling, 
event.KernelStackID, r.stackMap,
+               if d := r.base.GenerateProfilingData(r.kernelProfiling, 
stack.KernelStackID, stacks,
                        v3.EBPFProfilingStackType_PROCESS_KERNEL_SPACE, 
stackSymbols); d != nil {
                        metadatas = append(metadatas, d)
                }
 
                // user stack
-               if d := r.base.GenerateProfilingData(r.processProfiling, 
event.UserStackID, r.stackMap,
+               if d := r.base.GenerateProfilingData(r.processProfiling, 
stack.UserStackID, stacks,
                        v3.EBPFProfilingStackType_PROCESS_USER_SPACE, 
stackSymbols); d != nil {
                        metadatas = append(metadatas, d)
                }
 
-               // close the flush data notify if exists
-               if r.flushDataNotify != nil {
-                       r.flushDataNotify()
+               if len(metadatas) == 0 {
+                       continue
                }
 
-               if len(metadatas) == 0 {
+               // update the counters in memory
+               dumpCount := int32(counter)
+               existCounter := r.stackCounter[stack]
+               if existCounter > 0 {
+                       dumpCount -= int32(existCounter)
+               }
+               r.stackCounter[stack] = counter
+               if dumpCount <= 0 {
                        continue
                }
 
@@ -254,22 +241,18 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, 
error) {
                        Profiling: &v3.EBPFProfilingData_OnCPU{
                                OnCPU: &v3.EBPFOnCPUProfiling{
                                        Stacks:    metadatas,
-                                       DumpCount: int32(count),
+                                       DumpCount: dumpCount,
                                },
                        },
                })
        }
 
-       return result, nil
-}
-
-func (r *Runner) flushStackCounter() map[Event]int {
-       updateTo := make(map[Event]int)
-       updateToP := &updateTo
+       // close the flush data notify if exists
+       if r.flushDataNotify != nil {
+               r.flushDataNotify()
+       }
 
-       older := &r.stackCounter
-       *older, *updateToP = *updateToP, *older
-       return updateTo
+       return result, nil
 }
 
 func (r *Runner) closePerfEvent(fd int) error {

Reply via email to