This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 271e2cd Update to using frequency mode to `ON_CPU` Profiling. (#37)
271e2cd is described below
commit 271e2cd5ae3abd96d127499ce19a55d59111d7be
Author: mrproliu <[email protected]>
AuthorDate: Wed May 25 16:21:20 2022 +0800
Update to using frequency mode to `ON_CPU` Profiling. (#37)
---
CHANGES.md | 1 +
bpf/profiling/oncpu.c | 21 +++++-
bpf/profiling/oncpu.h | 5 +-
pkg/profiling/task/offcpu/runner.go | 3 +
pkg/profiling/task/oncpu/runner.go | 137 ++++++++++++++++--------------------
5 files changed, 88 insertions(+), 79 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index aaa79e3..858b912 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
#### Features
* Support `OFF_CPU` Profiling.
* Introduce the `BTFHub` module.
+* Update to using frequency mode to `ON_CPU` Profiling.
#### Bug Fixes
diff --git a/bpf/profiling/oncpu.c b/bpf/profiling/oncpu.c
index fa4321a..0c25893 100644
--- a/bpf/profiling/oncpu.c
+++ b/bpf/profiling/oncpu.c
@@ -22,6 +22,16 @@ char __license[] SEC("license") = "Dual MIT/GPL";
SEC("perf_event")
int do_perf_event(struct pt_regs *ctx) {
+ int monitor_pid;
+ asm("%0 = MONITOR_PID ll" : "=r"(monitor_pid));
+
+ // only match the same pid
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid != monitor_pid) {
+ return 0;
+ }
+
// create map key
struct key_t key = {};
@@ -29,6 +39,15 @@ int do_perf_event(struct pt_regs *ctx) {
key.kernel_stack_id = bpf_get_stackid(ctx, &stacks, 0);
key.user_stack_id = bpf_get_stackid(ctx, &stacks, BPF_F_USER_STACK);
- bpf_perf_event_output(ctx, &counts, BPF_F_CURRENT_CPU, &key, sizeof(key));
+ __u32 *val;
+ val = bpf_map_lookup_elem(&counts, &key);
+ if (!val) {
+ __u32 count = 0;
+ bpf_map_update_elem(&counts, &key, &count, BPF_NOEXIST);
+ val = bpf_map_lookup_elem(&counts, &key);
+ if (!val)
+ return 0;
+ }
+ (*val) += 1;
return 0;
}
\ No newline at end of file
diff --git a/bpf/profiling/oncpu.h b/bpf/profiling/oncpu.h
index 6fc467d..51c2940 100644
--- a/bpf/profiling/oncpu.h
+++ b/bpf/profiling/oncpu.h
@@ -21,7 +21,10 @@ struct key_t {
};
struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+ __uint(type, BPF_MAP_TYPE_HASH);
+ __type(key, struct key_t);
+ __type(value, __u32);
+ __uint(max_entries, 10000);
} counts SEC(".maps");
struct {
diff --git a/pkg/profiling/task/offcpu/runner.go
b/pkg/profiling/task/offcpu/runner.go
index dca5343..d45a899 100644
--- a/pkg/profiling/task/offcpu/runner.go
+++ b/pkg/profiling/task/offcpu/runner.go
@@ -185,6 +185,9 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData,
error) {
duration -= int64(existCounter.Deltas)
}
r.previousStacks[stack] = counter
+ if switchCount <= 0 {
+ continue
+ }
result = append(result, &v3.EBPFProfilingData{
Profiling: &v3.EBPFProfilingData_OffCPU{
diff --git a/pkg/profiling/task/oncpu/runner.go
b/pkg/profiling/task/oncpu/runner.go
index 7c9cc33..8051ce6 100644
--- a/pkg/profiling/task/oncpu/runner.go
+++ b/pkg/profiling/task/oncpu/runner.go
@@ -20,18 +20,11 @@
package oncpu
import (
- "bytes"
"context"
- "encoding/binary"
- "errors"
"fmt"
- "os"
"runtime"
"time"
- "github.com/cilium/ebpf"
- "github.com/cilium/ebpf/perf"
-
"github.com/hashicorp/go-multierror"
"github.com/apache/skywalking-rover/pkg/logger"
@@ -61,14 +54,14 @@ type Runner struct {
pid int32
processProfiling *profiling.Info
kernelProfiling *profiling.Info
- dumpPeriod time.Duration
+ dumpFrequency int64
// runtime
perfEventFds []int
- countReader *perf.Reader
- stackCounter map[Event]int
- stackMap *ebpf.Map
+ bpf *bpfObjects
+ stackCounter map[Event]uint32
flushDataNotify context.CancelFunc
+ stopChan chan bool
}
func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) {
@@ -83,9 +76,8 @@ func NewRunner(config *base.TaskConfig)
(base.ProfileTaskRunner, error) {
return nil, fmt.Errorf("the ON_CPU dump period could not be
smaller than 1ms")
}
return &Runner{
- base: base.NewBaseRunner(),
- dumpPeriod: dumpPeriod,
- stackCounter: make(map[Event]int),
+ base: base.NewBaseRunner(),
+ dumpFrequency: time.Second.Milliseconds() /
dumpPeriod.Milliseconds(),
}, nil
}
@@ -101,25 +93,36 @@ func (r *Runner) Init(task *base.ProfilingTask, process
api.ProcessInterface) er
log.Warnf("could not analyze kernel profiling stats: %v", err)
}
r.kernelProfiling = kernelProfiling
- r.stackCounter = make(map[Event]int)
+ r.stackCounter = make(map[Event]uint32)
+ r.stopChan = make(chan bool, 1)
return nil
}
func (r *Runner) Run(ctx context.Context, notify
base.ProfilingRunningSuccessNotify) error {
// load bpf
objs := bpfObjects{}
- if err := loadBpfObjects(&objs, nil); err != nil {
+ spec, err := loadBpf()
+ if err != nil {
return err
}
- defer objs.Close()
- r.stackMap = objs.Stacks
-
- // init profiling data reader
- rd, err := perf.NewReader(objs.Counts, os.Getpagesize())
- if err != nil {
- return fmt.Errorf("creating perf event reader: %s", err)
+ // update the monitor pid
+ funcName := "do_perf_event"
+ replacedPid := false
+ for i, ins := range spec.Programs[funcName].Instructions {
+ if ins.Reference == "MONITOR_PID" {
+ spec.Programs[funcName].Instructions[i].Constant =
int64(r.pid)
+ spec.Programs[funcName].Instructions[i].Offset = 0
+ replacedPid = true
+ }
+ }
+ if !replacedPid {
+ return fmt.Errorf("replace the monitor pid failure")
}
- r.countReader = rd
+ if err1 := spec.LoadAndAssign(&objs, nil); err1 != nil {
+ log.Fatalf("loading objects: %s", err1)
+ }
+ defer objs.Close()
+ r.bpf = &objs
// opened perf events
perfEvents, err := r.openPerfEvent(objs.DoPerfEvent.FD())
@@ -131,48 +134,24 @@ func (r *Runner) Run(ctx context.Context, notify
base.ProfilingRunningSuccessNot
// notify start success
notify()
runtime.SetFinalizer(r, (*Runner).Stop)
-
- // read content
- var event Event
- for {
- record, err := rd.Read()
- if err != nil {
- if errors.Is(err, perf.ErrClosed) {
- return nil
- }
- log.Warnf("reading from perf event reader: %s", err)
- continue
- }
-
- if record.LostSamples != 0 {
- log.Warnf("perf event ring buffer full, dropped %d
samples", record.LostSamples)
- continue
- }
-
- // parse perf event data
- if err := binary.Read(bytes.NewBuffer(record.RawSample),
binary.LittleEndian, &event); err != nil {
- log.Errorf("parsing perf event error: %s", err)
- continue
- }
-
- r.stackCounter[event]++
- }
+ <-r.stopChan
+ return nil
}
func (r *Runner) openPerfEvent(perfFd int) ([]int, error) {
eventAttr := &unix.PerfEventAttr{
- Type: unix.PERF_TYPE_SOFTWARE,
- Config: unix.PERF_COUNT_SW_CPU_CLOCK,
- Sample_type: unix.PERF_SAMPLE_RAW,
- Sample: uint64(r.dumpPeriod.Nanoseconds()),
- Wakeup: 1,
+ Type: unix.PERF_TYPE_SOFTWARE,
+ Config: unix.PERF_COUNT_SW_CPU_CLOCK,
+ Bits: unix.PerfBitFreq,
+ Sample: uint64(r.dumpFrequency),
+ Wakeup: 1,
}
fds := make([]int, 0)
for cpuNum := 0; cpuNum < runtime.NumCPU(); cpuNum++ {
fd, err := unix.PerfEventOpen(
eventAttr,
- int(r.pid),
+ -1,
cpuNum,
-1,
0,
@@ -213,8 +192,8 @@ func (r *Runner) Stop() error {
case <-time.After(5 * time.Second):
}
- if r.countReader != nil {
- if err := r.countReader.Close(); err != nil {
+ if r.bpf != nil {
+ if err := r.bpf.Close(); err != nil {
result = multierror.Append(result, err)
}
}
@@ -223,30 +202,38 @@ func (r *Runner) Stop() error {
}
func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
- existsCounters := r.flushStackCounter()
-
+ var stack Event
+ var counter uint32
+ iterate := r.bpf.Counts.Iterate()
+ stacks := r.bpf.Stacks
result := make([]*v3.EBPFProfilingData, 0)
stackSymbols := make([]uint64, 100)
- for event, count := range existsCounters {
+ for iterate.Next(&stack, &counter) {
metadatas := make([]*v3.EBPFProfilingStackMetadata, 0)
// kernel stack
- if d := r.base.GenerateProfilingData(r.kernelProfiling,
event.KernelStackID, r.stackMap,
+ if d := r.base.GenerateProfilingData(r.kernelProfiling,
stack.KernelStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_KERNEL_SPACE,
stackSymbols); d != nil {
metadatas = append(metadatas, d)
}
// user stack
- if d := r.base.GenerateProfilingData(r.processProfiling,
event.UserStackID, r.stackMap,
+ if d := r.base.GenerateProfilingData(r.processProfiling,
stack.UserStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_USER_SPACE,
stackSymbols); d != nil {
metadatas = append(metadatas, d)
}
- // close the flush data notify if exists
- if r.flushDataNotify != nil {
- r.flushDataNotify()
+ if len(metadatas) == 0 {
+ continue
}
- if len(metadatas) == 0 {
+ // update the counters in memory
+ dumpCount := int32(counter)
+ existCounter := r.stackCounter[stack]
+ if existCounter > 0 {
+ dumpCount -= int32(existCounter)
+ }
+ r.stackCounter[stack] = counter
+ if dumpCount <= 0 {
continue
}
@@ -254,22 +241,18 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData,
error) {
Profiling: &v3.EBPFProfilingData_OnCPU{
OnCPU: &v3.EBPFOnCPUProfiling{
Stacks: metadatas,
- DumpCount: int32(count),
+ DumpCount: dumpCount,
},
},
})
}
- return result, nil
-}
-
-func (r *Runner) flushStackCounter() map[Event]int {
- updateTo := make(map[Event]int)
- updateToP := &updateTo
+ // close the flush data notify if exists
+ if r.flushDataNotify != nil {
+ r.flushDataNotify()
+ }
- older := &r.stackCounter
- *older, *updateToP = *updateToP, *older
- return updateTo
+ return result, nil
}
func (r *Runner) closePerfEvent(fd int) error {