This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new d045da1  Fix the profiling cannot found process issue (#132)
d045da1 is described below

commit d045da14ebfe237b27f4c48dd9dbf8692a787c95
Author: mrproliu <[email protected]>
AuthorDate: Tue Jun 18 08:45:09 2024 +0000

    Fix the profiling cannot found process issue (#132)
---
 CHANGES.md                     |  1 +
 pkg/process/finders/manager.go | 14 +-------------
 pkg/process/finders/storage.go | 30 +++++++++++++++++++++++++++++-
 pkg/profiling/task/manager.go  |  1 +
 pkg/tools/buffer/buffer.go     |  4 ++--
 5 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 44b268c..926cdd6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 * Fix errors when compiling C source files into eBPF bytecode on a system with 
Linux headers version 6.2 or higher.
 * Fixed `ip_list_rcv` probe is not exist in older linux kernel.
 * Fix concurrent map operation in the access log module.
+* Fix the profiling cannot found process issue.
 
 #### Documentation
 
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index 0d6aee7..dbe99c0 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -48,8 +48,6 @@ type ProcessManager struct {
 type ProcessManagerWithFinder struct {
        *ProcessManager
        finderType api.ProcessDetectType
-
-       lastSync []api.DetectedProcess
 }
 
 func NewProcessManager(ctx context.Context, moduleManager *module.Manager,
@@ -121,20 +119,10 @@ func (p *ProcessManagerWithFinder) GetModuleManager() 
*module.Manager {
 
 func (p *ProcessManagerWithFinder) SyncAllProcessInFinder(processes 
[]api.DetectedProcess) {
        p.storage.SyncAllProcessInFinder(p.finderType, processes)
-       p.lastSync = processes
 }
 
 func (p *ProcessManagerWithFinder) AddDetectedProcess(processes 
[]api.DetectedProcess) {
-       if len(p.lastSync) == 0 {
-               p.SyncAllProcessInFinder(processes)
-               p.lastSync = processes
-               return
-       }
-       // fetch existing process, add the new processes, finally, re-sync
-       detectedProcesses := make([]api.DetectedProcess, 0, 
len(processes)+len(p.lastSync))
-       detectedProcesses = append(detectedProcesses, p.lastSync...)
-       detectedProcesses = append(detectedProcesses, processes...)
-       p.SyncAllProcessInFinder(detectedProcesses)
+       p.storage.AddNewProcessInFinder(p.finderType, processes)
 }
 
 func (m *ProcessManager) GetAllProcesses() []api.ProcessInterface {
diff --git a/pkg/process/finders/storage.go b/pkg/process/finders/storage.go
index 16314ee..949ff28 100644
--- a/pkg/process/finders/storage.go
+++ b/pkg/process/finders/storage.go
@@ -237,6 +237,34 @@ func (s *ProcessStorage) 
processesReport(waitReportProcesses []*ProcessContext)
        return nil
 }
 
+func (s *ProcessStorage) AddNewProcessInFinder(finder api.ProcessDetectType, 
processes []api.DetectedProcess) {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+
+       addProcessBuilder := s.newProcessEventBuilder(ProcessOperateAdd)
+       for _, newProcess := range processes {
+               if newProcess == nil {
+                       continue
+               }
+               founded := false
+               for _, existingProcess := range s.processes[finder] {
+                       if existingProcess.Pid() == newProcess.Pid() && 
existingProcess.Entity().SameWith(newProcess.Entity()) {
+                               founded = true
+                               break
+                       }
+               }
+
+               // if not found in existing processes, need to add this process
+               if !founded {
+                       processContext := s.constructNewProcessContext(finder, 
newProcess)
+                       addProcessBuilder.AddProcess(newProcess.Pid(), 
processContext)
+                       s.processes[finder] = append(s.processes[finder], 
processContext)
+                       log.Infof("detected new process by add process: pid: 
%d, entity: %s", newProcess.Pid(), newProcess.Entity())
+               }
+       }
+       addProcessBuilder.Send()
+}
+
 func (s *ProcessStorage) SyncAllProcessInFinder(finder api.ProcessDetectType, 
processes []api.DetectedProcess) {
        s.mutex.Lock()
        defer s.mutex.Unlock()
@@ -269,7 +297,7 @@ func (s *ProcessStorage) SyncAllProcessInFinder(finder 
api.ProcessDetectType, pr
                        processContext := s.constructNewProcessContext(finder, 
syncProcess)
                        newProcesses = append(newProcesses, processContext)
                        addProcessBuilder.AddProcess(syncProcess.Pid(), 
newProcesses[len(newProcesses)-1])
-                       log.Infof("detected new process: pid: %d, entity: %s", 
syncProcess.Pid(), syncProcess.Entity())
+                       log.Infof("detected new process by sync all: pid: %d, 
entity: %s", syncProcess.Pid(), syncProcess.Entity())
                }
        }
        addProcessBuilder.Send()
diff --git a/pkg/profiling/task/manager.go b/pkg/profiling/task/manager.go
index 751aa38..b1bb612 100644
--- a/pkg/profiling/task/manager.go
+++ b/pkg/profiling/task/manager.go
@@ -294,6 +294,7 @@ func (m *Manager) StartingWatchTask() error {
                return err
        }
        if len(tasks.Commands) == 0 {
+               log.Debugf("no profiling task found need to execute")
                return nil
        }
 
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index ec24b23..1c71f7f 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -253,7 +253,7 @@ func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
 }
 
 func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
-       if r.dataEvents.Len() == 0 {
+       if r.dataEvents == nil || r.dataEvents.Len() == 0 {
                return nil
        }
        return r.dataEvents.Back().Value.(SocketDataBuffer)
@@ -285,7 +285,7 @@ func CombineSlices(validated bool, buffers ...*Buffer) 
*Buffer {
        dataEvents := list.New()
        detailEvents := list.New()
        for _, b := range buffers {
-               if b == nil {
+               if b == nil || b.head == nil {
                        continue
                }
                if b.head.bufIndex > 0 {

Reply via email to