This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 6064a88  Introduce `MonitorFilter` into access log module (#140)
6064a88 is described below

commit 6064a88fa597fe1669efd33aa6972ccd4cb413e5
Author: mrproliu <[email protected]>
AuthorDate: Mon Sep 2 18:34:20 2024 +0800

    Introduce `MonitorFilter` into access log module (#140)
---
 CHANGES.md                         |  1 +
 pkg/accesslog/common/connection.go | 46 +++-------------------
 pkg/accesslog/common/filter.go     | 80 ++++++++++++++++++++++++++++++++++++++
 pkg/accesslog/runner.go            |  4 +-
 4 files changed, 89 insertions(+), 42 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ffa770c..320d3f9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 * Support propagation the excluding namespaces in the access log to the 
backend.
 * Add `pprof` module for observe self.
 * Add detect process from `CRI-O` container in Kubernetes.
+* Introduce `MonitorFilter` into access log module. 
 
 #### Bug Fixes
 * Fixed the issue where `conntrack` could not find the Reply IP in the access 
log module.
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 71d1056..de98fb9 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -21,7 +21,6 @@ import (
        "context"
        "errors"
        "fmt"
-       "strings"
        "sync"
        "time"
 
@@ -109,8 +108,7 @@ type ConnectionManager struct {
        processMonitorMap   *ebpf.Map
        activeConnectionMap *ebpf.Map
 
-       excludeNamespaces map[string]bool
-       excludeClusters   map[string]bool
+       monitorFilter MonitorFilter
 
        processors       []ConnectionProcessor
        processListeners []ProcessListener
@@ -139,15 +137,7 @@ type ConnectionInfo struct {
        PID           uint32
 }
 
-func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader) *ConnectionManager {
-       excludeNamespaces := make(map[string]bool)
-       for _, ns := range strings.Split(config.ExcludeNamespaces, ",") {
-               excludeNamespaces[ns] = true
-       }
-       excludeClusters := make(map[string]bool)
-       for _, cluster := range strings.Split(config.ExcludeClusters, ",") {
-               excludeClusters[cluster] = true
-       }
+func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
        mgr := &ConnectionManager{
                moduleMgr:                moduleMgr,
                processOP:                
moduleMgr.FindModule(process.ModuleName).(process.Operator),
@@ -159,8 +149,7 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
                processMonitorMap:        bpfLoader.ProcessMonitorControl,
                activeConnectionMap:      bpfLoader.ActiveConnectionMap,
                allUnfinishedConnections: make(map[string]*bool),
-               excludeNamespaces:        excludeNamespaces,
-               excludeClusters:          excludeClusters,
+               monitorFilter:            filter,
        }
        return mgr
 }
@@ -229,11 +218,7 @@ func (c *ConnectionManager) OnNewProcessExecuting(pid 
int32) {
 }
 
 func (c *ConnectionManager) GetExcludeNamespaces() []string {
-       namespaces := make([]string, len(c.excludeNamespaces))
-       for namespace := range c.excludeNamespaces {
-               namespaces = append(namespaces, namespace)
-       }
-       return namespaces
+       return c.monitorFilter.ExcludeNamespaces()
 }
 
 func (c *ConnectionManager) Find(event events.Event) *ConnectionInfo {
@@ -365,9 +350,6 @@ func (c *ConnectionManager) 
buildAddressFromLocalKubernetesProcess(pid uint32, p
        for _, pi := range c.monitoringProcesses[int32(pid)] {
                if pi.DetectType() == api.Kubernetes {
                        entity := pi.Entity()
-                       if cluster, _, found := strings.Cut(entity.ServiceName, 
"::"); found && c.excludeClusters[cluster] {
-                               continue
-                       }
                        podContainer := 
pi.DetectProcess().(*kubernetes.Process).PodContainer()
                        return &v3.ConnectionAddress{
                                Address: &v3.ConnectionAddress_Kubernetes{
@@ -528,25 +510,7 @@ func (c *ConnectionManager) 
printTotalAddressesWithPid(prefix string) {
 }
 
 func (c *ConnectionManager) shouldExcludeTheProcess(entities 
[]api.ProcessInterface) bool {
-       // when the process contains multiple entity, and contains the cluster 
not exclude, then should not exclude the process
-       containsNotExcludeCluster := false
-       for _, entity := range entities {
-               if entity.DetectType() == api.Kubernetes { // for now, we only 
have the kubernetes detected process
-                       namespace := 
entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
-                       if c.excludeNamespaces[namespace] {
-                               return true
-                       }
-                       if cluster, _, found := 
strings.Cut(entity.Entity().ServiceName, "::"); found {
-                               if !c.excludeClusters[cluster] {
-                                       containsNotExcludeCluster = true
-                               }
-                       } else {
-                               containsNotExcludeCluster = true
-                               break
-                       }
-               }
-       }
-       return !containsNotExcludeCluster
+       return c.monitorFilter.ShouldExclude(entities)
 }
 
 func (c *ConnectionManager) RemoveProcess(pid int32, entities 
[]api.ProcessInterface) {
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
new file mode 100644
index 0000000..dad1741
--- /dev/null
+++ b/pkg/accesslog/common/filter.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+       "strings"
+
+       "github.com/apache/skywalking-rover/pkg/process/api"
+       "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
+)
+
+type MonitorFilter interface {
+       // ShouldExclude returns true if the process should be excluded from 
monitoring.
+       ShouldExclude(process []api.ProcessInterface) bool
+       // ExcludeNamespaces returns a list of namespaces that should be 
excluded from monitoring.
+       ExcludeNamespaces() []string
+}
+
+type StaticMonitorFilter struct {
+       namespaces         map[string]bool
+       clusters           map[string]bool
+       originalNamespaces []string
+}
+
+func NewStaticMonitorFilter(namespaces, clusters []string) 
*StaticMonitorFilter {
+       return &StaticMonitorFilter{
+               namespaces:         convertArrayToMapBool(namespaces),
+               clusters:           convertArrayToMapBool(clusters),
+               originalNamespaces: namespaces,
+       }
+}
+
+func (s *StaticMonitorFilter) ShouldExclude(processes []api.ProcessInterface) 
bool {
+       containsNotExcludeCluster := false
+       for _, entity := range processes {
+               if entity.DetectType() != api.Kubernetes { // for now, we only 
have the kubernetes detected processes
+                       continue
+               }
+               namespace := 
entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
+               if s.namespaces[namespace] {
+                       return true
+               }
+               if cluster, _, found := 
strings.Cut(entity.Entity().ServiceName, "::"); found {
+                       if !s.clusters[cluster] {
+                               containsNotExcludeCluster = true
+                       }
+               } else {
+                       containsNotExcludeCluster = true
+                       break
+               }
+       }
+       return !containsNotExcludeCluster
+}
+
+func (s *StaticMonitorFilter) ExcludeNamespaces() []string {
+       return s.originalNamespaces
+}
+
+func convertArrayToMapBool(a []string) map[string]bool {
+       m := make(map[string]bool, len(a))
+       for _, v := range a {
+               m[v] = true
+       }
+       return m
+}
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index e79a81b..63f9eee 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -20,6 +20,7 @@ package accesslog
 import (
        "context"
        "fmt"
+       "strings"
        "time"
 
        process2 "github.com/shirou/gopsutil/process"
@@ -68,11 +69,12 @@ func NewRunner(mgr *module.Manager, config *common.Config) 
(*Runner, error) {
        coreModule := mgr.FindModule(core.ModuleName).(core.Operator)
        backendOP := coreModule.BackendOperator()
        clusterName := coreModule.ClusterName()
+       monitorFilter := 
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), 
strings.Split(config.ExcludeClusters, ","))
        runner := &Runner{
                context: &common.AccessLogContext{
                        BPF:           bpfLoader,
                        Config:        config,
-                       ConnectionMgr: common.NewConnectionManager(config, mgr, 
bpfLoader),
+                       ConnectionMgr: common.NewConnectionManager(config, mgr, 
bpfLoader, monitorFilter),
                },
                collectors: collector.Collectors(),
                mgr:        mgr,

Reply via email to