This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new d045da1 Fix the profiling cannot found process issue (#132)
d045da1 is described below
commit d045da14ebfe237b27f4c48dd9dbf8692a787c95
Author: mrproliu <[email protected]>
AuthorDate: Tue Jun 18 08:45:09 2024 +0000
Fix the profiling cannot found process issue (#132)
---
CHANGES.md | 1 +
pkg/process/finders/manager.go | 14 +-------------
pkg/process/finders/storage.go | 30 +++++++++++++++++++++++++++++-
pkg/profiling/task/manager.go | 1 +
pkg/tools/buffer/buffer.go | 4 ++--
5 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 44b268c..926cdd6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
* Fix errors when compiling C source files into eBPF bytecode on a system with
Linux headers version 6.2 or higher.
* Fixed `ip_list_rcv` probe is not exist in older linux kernel.
* Fix concurrent map operation in the access log module.
+* Fix the profiling cannot found process issue.
#### Documentation
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index 0d6aee7..dbe99c0 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -48,8 +48,6 @@ type ProcessManager struct {
type ProcessManagerWithFinder struct {
*ProcessManager
finderType api.ProcessDetectType
-
- lastSync []api.DetectedProcess
}
func NewProcessManager(ctx context.Context, moduleManager *module.Manager,
@@ -121,20 +119,10 @@ func (p *ProcessManagerWithFinder) GetModuleManager()
*module.Manager {
func (p *ProcessManagerWithFinder) SyncAllProcessInFinder(processes
[]api.DetectedProcess) {
p.storage.SyncAllProcessInFinder(p.finderType, processes)
- p.lastSync = processes
}
func (p *ProcessManagerWithFinder) AddDetectedProcess(processes
[]api.DetectedProcess) {
- if len(p.lastSync) == 0 {
- p.SyncAllProcessInFinder(processes)
- p.lastSync = processes
- return
- }
- // fetch existing process, add the new processes, finally, re-sync
- detectedProcesses := make([]api.DetectedProcess, 0,
len(processes)+len(p.lastSync))
- detectedProcesses = append(detectedProcesses, p.lastSync...)
- detectedProcesses = append(detectedProcesses, processes...)
- p.SyncAllProcessInFinder(detectedProcesses)
+ p.storage.AddNewProcessInFinder(p.finderType, processes)
}
func (m *ProcessManager) GetAllProcesses() []api.ProcessInterface {
diff --git a/pkg/process/finders/storage.go b/pkg/process/finders/storage.go
index 16314ee..949ff28 100644
--- a/pkg/process/finders/storage.go
+++ b/pkg/process/finders/storage.go
@@ -237,6 +237,34 @@ func (s *ProcessStorage)
processesReport(waitReportProcesses []*ProcessContext)
return nil
}
+func (s *ProcessStorage) AddNewProcessInFinder(finder api.ProcessDetectType,
processes []api.DetectedProcess) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ addProcessBuilder := s.newProcessEventBuilder(ProcessOperateAdd)
+ for _, newProcess := range processes {
+ if newProcess == nil {
+ continue
+ }
+ founded := false
+ for _, existingProcess := range s.processes[finder] {
+ if existingProcess.Pid() == newProcess.Pid() &&
existingProcess.Entity().SameWith(newProcess.Entity()) {
+ founded = true
+ break
+ }
+ }
+
+ // if not found in existing processes, need to add this process
+ if !founded {
+ processContext := s.constructNewProcessContext(finder,
newProcess)
+ addProcessBuilder.AddProcess(newProcess.Pid(),
processContext)
+ s.processes[finder] = append(s.processes[finder],
processContext)
+ log.Infof("detected new process by add process: pid:
%d, entity: %s", newProcess.Pid(), newProcess.Entity())
+ }
+ }
+ addProcessBuilder.Send()
+}
+
func (s *ProcessStorage) SyncAllProcessInFinder(finder api.ProcessDetectType,
processes []api.DetectedProcess) {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -269,7 +297,7 @@ func (s *ProcessStorage) SyncAllProcessInFinder(finder
api.ProcessDetectType, pr
processContext := s.constructNewProcessContext(finder,
syncProcess)
newProcesses = append(newProcesses, processContext)
addProcessBuilder.AddProcess(syncProcess.Pid(),
newProcesses[len(newProcesses)-1])
- log.Infof("detected new process: pid: %d, entity: %s",
syncProcess.Pid(), syncProcess.Entity())
+ log.Infof("detected new process by sync all: pid: %d,
entity: %s", syncProcess.Pid(), syncProcess.Entity())
}
}
addProcessBuilder.Send()
diff --git a/pkg/profiling/task/manager.go b/pkg/profiling/task/manager.go
index 751aa38..b1bb612 100644
--- a/pkg/profiling/task/manager.go
+++ b/pkg/profiling/task/manager.go
@@ -294,6 +294,7 @@ func (m *Manager) StartingWatchTask() error {
return err
}
if len(tasks.Commands) == 0 {
+ log.Debugf("no profiling task found need to execute")
return nil
}
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index ec24b23..1c71f7f 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -253,7 +253,7 @@ func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
}
func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
- if r.dataEvents.Len() == 0 {
+ if r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Back().Value.(SocketDataBuffer)
@@ -285,7 +285,7 @@ func CombineSlices(validated bool, buffers ...*Buffer)
*Buffer {
dataEvents := list.New()
detailEvents := list.New()
for _, b := range buffers {
- if b == nil {
+ if b == nil || b.head == nil {
continue
}
if b.head.bufIndex > 0 {