This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 6064a88 Introduce `MonitorFilter` into access log module (#140)
6064a88 is described below
commit 6064a88fa597fe1669efd33aa6972ccd4cb413e5
Author: mrproliu <[email protected]>
AuthorDate: Mon Sep 2 18:34:20 2024 +0800
Introduce `MonitorFilter` into access log module (#140)
---
CHANGES.md | 1 +
pkg/accesslog/common/connection.go | 46 +++-------------------
pkg/accesslog/common/filter.go | 80 ++++++++++++++++++++++++++++++++++++++
pkg/accesslog/runner.go | 4 +-
4 files changed, 89 insertions(+), 42 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ffa770c..320d3f9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
* Support propagation the excluding namespaces in the access log to the
backend.
* Add `pprof` module for observe self.
* Add detect process from `CRI-O` container in Kubernetes.
+* Introduce `MonitorFilter` into access log module.
#### Bug Fixes
* Fixed the issue where `conntrack` could not find the Reply IP in the access
log module.
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 71d1056..de98fb9 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
- "strings"
"sync"
"time"
@@ -109,8 +108,7 @@ type ConnectionManager struct {
processMonitorMap *ebpf.Map
activeConnectionMap *ebpf.Map
- excludeNamespaces map[string]bool
- excludeClusters map[string]bool
+ monitorFilter MonitorFilter
processors []ConnectionProcessor
processListeners []ProcessListener
@@ -139,15 +137,7 @@ type ConnectionInfo struct {
PID uint32
}
-func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader) *ConnectionManager {
- excludeNamespaces := make(map[string]bool)
- for _, ns := range strings.Split(config.ExcludeNamespaces, ",") {
- excludeNamespaces[ns] = true
- }
- excludeClusters := make(map[string]bool)
- for _, cluster := range strings.Split(config.ExcludeClusters, ",") {
- excludeClusters[cluster] = true
- }
+func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
mgr := &ConnectionManager{
moduleMgr: moduleMgr,
processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
@@ -159,8 +149,7 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
processMonitorMap: bpfLoader.ProcessMonitorControl,
activeConnectionMap: bpfLoader.ActiveConnectionMap,
allUnfinishedConnections: make(map[string]*bool),
- excludeNamespaces: excludeNamespaces,
- excludeClusters: excludeClusters,
+ monitorFilter: filter,
}
return mgr
}
@@ -229,11 +218,7 @@ func (c *ConnectionManager) OnNewProcessExecuting(pid
int32) {
}
func (c *ConnectionManager) GetExcludeNamespaces() []string {
- namespaces := make([]string, len(c.excludeNamespaces))
- for namespace := range c.excludeNamespaces {
- namespaces = append(namespaces, namespace)
- }
- return namespaces
+ return c.monitorFilter.ExcludeNamespaces()
}
func (c *ConnectionManager) Find(event events.Event) *ConnectionInfo {
@@ -365,9 +350,6 @@ func (c *ConnectionManager)
buildAddressFromLocalKubernetesProcess(pid uint32, p
for _, pi := range c.monitoringProcesses[int32(pid)] {
if pi.DetectType() == api.Kubernetes {
entity := pi.Entity()
- if cluster, _, found := strings.Cut(entity.ServiceName,
"::"); found && c.excludeClusters[cluster] {
- continue
- }
podContainer :=
pi.DetectProcess().(*kubernetes.Process).PodContainer()
return &v3.ConnectionAddress{
Address: &v3.ConnectionAddress_Kubernetes{
@@ -528,25 +510,7 @@ func (c *ConnectionManager)
printTotalAddressesWithPid(prefix string) {
}
func (c *ConnectionManager) shouldExcludeTheProcess(entities
[]api.ProcessInterface) bool {
- // when the process contains multiple entity, and contains the cluster
not exclude, then should not exclude the process
- containsNotExcludeCluster := false
- for _, entity := range entities {
- if entity.DetectType() == api.Kubernetes { // for now, we only
have the kubernetes detected process
- namespace :=
entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
- if c.excludeNamespaces[namespace] {
- return true
- }
- if cluster, _, found :=
strings.Cut(entity.Entity().ServiceName, "::"); found {
- if !c.excludeClusters[cluster] {
- containsNotExcludeCluster = true
- }
- } else {
- containsNotExcludeCluster = true
- break
- }
- }
- }
- return !containsNotExcludeCluster
+ return c.monitorFilter.ShouldExclude(entities)
}
func (c *ConnectionManager) RemoveProcess(pid int32, entities
[]api.ProcessInterface) {
diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go
new file mode 100644
index 0000000..dad1741
--- /dev/null
+++ b/pkg/accesslog/common/filter.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "strings"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
+)
+
+type MonitorFilter interface {
+ // ShouldExclude returns true if the process should be excluded from
monitoring.
+ ShouldExclude(process []api.ProcessInterface) bool
+ // ExcludeNamespaces returns a list of namespaces that should be
excluded from monitoring.
+ ExcludeNamespaces() []string
+}
+
+type StaticMonitorFilter struct {
+ namespaces map[string]bool
+ clusters map[string]bool
+ originalNamespaces []string
+}
+
+func NewStaticMonitorFilter(namespaces, clusters []string)
*StaticMonitorFilter {
+ return &StaticMonitorFilter{
+ namespaces: convertArrayToMapBool(namespaces),
+ clusters: convertArrayToMapBool(clusters),
+ originalNamespaces: namespaces,
+ }
+}
+
+func (s *StaticMonitorFilter) ShouldExclude(processes []api.ProcessInterface)
bool {
+ containsNotExcludeCluster := false
+ for _, entity := range processes {
+ if entity.DetectType() != api.Kubernetes { // for now, we only
have the kubernetes detected processes
+ continue
+ }
+ namespace :=
entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
+ if s.namespaces[namespace] {
+ return true
+ }
+ if cluster, _, found :=
strings.Cut(entity.Entity().ServiceName, "::"); found {
+ if !s.clusters[cluster] {
+ containsNotExcludeCluster = true
+ }
+ } else {
+ containsNotExcludeCluster = true
+ break
+ }
+ }
+ return !containsNotExcludeCluster
+}
+
+func (s *StaticMonitorFilter) ExcludeNamespaces() []string {
+ return s.originalNamespaces
+}
+
+func convertArrayToMapBool(a []string) map[string]bool {
+ m := make(map[string]bool, len(a))
+ for _, v := range a {
+ m[v] = true
+ }
+ return m
+}
diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go
index e79a81b..63f9eee 100644
--- a/pkg/accesslog/runner.go
+++ b/pkg/accesslog/runner.go
@@ -20,6 +20,7 @@ package accesslog
import (
"context"
"fmt"
+ "strings"
"time"
process2 "github.com/shirou/gopsutil/process"
@@ -68,11 +69,12 @@ func NewRunner(mgr *module.Manager, config *common.Config)
(*Runner, error) {
coreModule := mgr.FindModule(core.ModuleName).(core.Operator)
backendOP := coreModule.BackendOperator()
clusterName := coreModule.ClusterName()
+ monitorFilter :=
common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","),
strings.Split(config.ExcludeClusters, ","))
runner := &Runner{
context: &common.AccessLogContext{
BPF: bpfLoader,
Config: config,
- ConnectionMgr: common.NewConnectionManager(config, mgr,
bpfLoader),
+ ConnectionMgr: common.NewConnectionManager(config, mgr,
bpfLoader, monitorFilter),
},
collectors: collector.Collectors(),
mgr: mgr,